Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -1565,10 +1565,11 @@ In the list below, the names of required properties appear in bold. Any other pr

### Output Attributes

| Attribute | Relationship | Description |
|------------|--------------|----------------------------------------------|
| udp.port | | The sending port the messages were received. |
| udp.sender | | The sending host of the messages. |
| Attribute | Relationship | Description |
|-----------------|--------------|---------------------------------------------------------|
| udp.port | | The listening port on which the messages were received. |
| udp.sender | | The sending host of the messages. |
| udp.sender.port | | The sending port of the messages. |


## ListFile
Expand Down
15 changes: 12 additions & 3 deletions core-framework/include/utils/MinifiConcurrentQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@
#pragma once

#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <utility>
#include <optional>
#include <stdexcept>
#include <atomic>
#include <utility>

#include "utils/TryMoveCall.h"
#include "minifi-cpp/utils/gsl.h"

namespace org::apache::nifi::minifi::utils {

Expand Down Expand Up @@ -56,6 +58,13 @@ class ConcurrentQueue {
return tryDequeueImpl(lck, out);
}

std::optional<T> tryDequeue() {
std::optional<T> result = std::nullopt;
const bool consume_result = consume([&result](T value) { result = std::move(value); });
gsl_Assert(consume_result == result.has_value());
return result;
}

template<typename Functor>
bool consume(Functor&& fun) {
std::unique_lock<std::mutex> lck(mtx_);
Expand Down
15 changes: 8 additions & 7 deletions extension-framework/include/utils/net/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@
namespace org::apache::nifi::minifi::utils::net {

struct Message {
public:
Message() = default;
Message(std::string message_data, IpProtocol protocol, asio::ip::address sender_address, asio::ip::port_type server_port)
Message() = delete;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old default constructor left a few members uninitialized. It used to be needed because objects were assigned from an "empty" state through out params. Refactored those instances, so that no more default constructed "empty" messages are necessary.

Message(std::string message_data, IpProtocol protocol, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port)
: message_data(std::move(message_data)),
protocol(protocol),
server_port(server_port),
sender_address(std::move(sender_address)) {
remote_address(std::move(remote_address)),
remote_port(remote_port),
local_port(local_port) {
}

bool is_partial = false;
std::string message_data;
IpProtocol protocol;
asio::ip::port_type server_port;
asio::ip::address sender_address;
asio::ip::address remote_address;
asio::ip::port_type remote_port;
asio::ip::port_type local_port;
};

} // namespace org::apache::nifi::minifi::utils::net
8 changes: 6 additions & 2 deletions extension-framework/include/utils/net/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ class Server {
bool queueEmpty() {
return concurrent_queue_.empty();
}
bool tryDequeue(utils::net::Message& received_message) {
return concurrent_queue_.tryDequeue(received_message);
std::optional<utils::net::Message> tryDequeue() {
return concurrent_queue_.tryDequeue();
}
Server(const Server&) = delete;
Server(Server&&) = delete;
Server& operator=(const Server&) = delete;
Server& operator=(Server&&) = delete;
virtual ~Server() {
Comment on lines +53 to 57
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rule of 5 (same with TcpClient and NetworkListenerProcessor)
https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#rc-five

stop();
}
Expand Down
6 changes: 3 additions & 3 deletions extension-framework/include/utils/net/TcpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class TcpServer : public Server {
protected:
asio::awaitable<void> doReceive() override;

asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port);
asio::awaitable<void> secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port);
asio::awaitable<void> insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port);
asio::awaitable<void> secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port);

asio::awaitable<void> readLoop(auto& socket, const auto& remote_address, const auto& local_port);
asio::awaitable<void> readLoop(auto& socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port);

bool consume_delimiter_;
const std::string delimiter_;
Expand Down
2 changes: 0 additions & 2 deletions extension-framework/include/utils/net/UdpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

#include <optional>
#include <memory>
#include <string>
#include <asio/awaitable.hpp>

#include "Server.h"
#include "utils/MinifiConcurrentQueue.h"
#include "minifi-cpp/core/logging/Logger.h"
#include "core/logging/LoggerFactory.h"

Expand Down
29 changes: 16 additions & 13 deletions extension-framework/src/utils/net/TcpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,23 @@ asio::awaitable<void> TcpServer::doReceive() {
continue;
}
std::error_code error;
auto remote_address = socket.lowest_layer().remote_endpoint(error).address();
if (error)
auto remote_endpoint = socket.lowest_layer().remote_endpoint(error);
if (error) {
logger_->log_warn("Error during fetching remote endpoint: {}", error.message());
}
auto local_port = socket.lowest_layer().local_endpoint(error).port();
if (error)
if (error) {
logger_->log_warn("Error during fetching local endpoint: {}", error.message());
if (ssl_data_)
co_spawn(io_context_, secureSession(std::move(socket), std::move(remote_address), local_port), asio::detached);
else
co_spawn(io_context_, insecureSession(std::move(socket), std::move(remote_address), local_port), asio::detached);
}
if (ssl_data_) {
co_spawn(io_context_, secureSession(std::move(socket), remote_endpoint.address(), remote_endpoint.port(), local_port), asio::detached);
} else {
co_spawn(io_context_, insecureSession(std::move(socket), remote_endpoint.address(), remote_endpoint.port(), local_port), asio::detached);
}
}
}

asio::awaitable<void> TcpServer::readLoop(auto& socket, const auto& remote_address, const auto& local_port) {
asio::awaitable<void> TcpServer::readLoop(auto& socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port) {
std::string read_message;
while (true) {
auto [read_error, bytes_read] = co_await asio::async_read_until(socket, asio::dynamic_buffer(read_message), delimiter_, use_nothrow_awaitable); // NOLINT
Expand All @@ -65,16 +68,16 @@ asio::awaitable<void> TcpServer::readLoop(auto& socket, const auto& remote_addre

if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
auto message_str = read_message.substr(0, bytes_read - (consume_delimiter_ ? delimiter_.size() : 0));
concurrent_queue_.enqueue(Message(std::move(message_str), IpProtocol::TCP, remote_address, local_port));
concurrent_queue_.enqueue(Message(std::move(message_str), IpProtocol::TCP, remote_address, remote_port, local_port));
} else {
logger_->log_warn("Queue is full. TCP message ignored.");
}
read_message.erase(0, bytes_read);
}
}

asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port) {
co_return co_await readLoop(socket, remote_address, local_port); // NOLINT
asio::awaitable<void> TcpServer::insecureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port) {
co_return co_await readLoop(socket, remote_address, remote_port, local_port); // NOLINT
}

namespace {
Expand All @@ -95,7 +98,7 @@ asio::ssl::context setupSslContext(SslServerOptions& ssl_data) {
}
} // namespace

asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type local_port) {
asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket, asio::ip::address remote_address, asio::ip::port_type remote_port, asio::ip::port_type local_port) {
gsl_Expects(ssl_data_);
auto ssl_context = setupSslContext(*ssl_data_);
SslSocket ssl_socket(std::move(socket), ssl_context);
Expand All @@ -104,7 +107,7 @@ asio::awaitable<void> TcpServer::secureSession(asio::ip::tcp::socket socket, asi
logger_->log_warn("Handshake with {} failed due to {}", remote_address, handshake_error.message());
co_return;
}
co_await readLoop(ssl_socket, remote_address, local_port); // NOLINT
co_await readLoop(ssl_socket, remote_address, remote_port, local_port);

asio::error_code ec;
std::ignore = ssl_socket.lowest_layer().cancel(ec);
Expand Down
9 changes: 4 additions & 5 deletions extension-framework/src/utils/net/UdpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
* limitations under the License.
*/
#include "utils/net/UdpServer.h"
#include "asio/use_awaitable.hpp"
#include "asio/detached.hpp"
#include "utils/net/AsioCoro.h"

namespace org::apache::nifi::minifi::utils::net {
Expand All @@ -43,10 +41,11 @@ asio::awaitable<void> UdpServer::doReceive() {
continue;
}
buffer.resize(bytes_received);
if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size())
concurrent_queue_.enqueue(utils::net::Message(std::move(buffer), IpProtocol::UDP, sender_endpoint.address(), socket.local_endpoint().port()));
else
if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
concurrent_queue_.enqueue(utils::net::Message(std::move(buffer), IpProtocol::UDP, sender_endpoint.address(), sender_endpoint.port(), socket.local_endpoint().port()));
} else {
logger_->log_warn("Queue is full. UDP message ignored.");
}
}
}

Expand Down
20 changes: 12 additions & 8 deletions extensions/standard-processors/processors/GetTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void GetTCP::notifyStop() {
void GetTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
auto flow_file = session.create();
session.writeBuffer(flow_file, message.message_data);
flow_file->setAttribute(GetTCP::SourceEndpoint.name, fmt::format("{}:{}", message.sender_address.to_string(), std::to_string(message.server_port)));
flow_file->setAttribute(GetTCP::SourceEndpoint.name, fmt::format("{}:{}", message.remote_address.to_string(), std::to_string(message.remote_port)));
if (message.is_partial)
session.transfer(flow_file, Partial);
else
Expand All @@ -128,11 +128,12 @@ void GetTCP::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
gsl_Expects(max_batch_size_ > 0);
size_t logs_processed = 0;
while (!client_->queueEmpty() && logs_processed < max_batch_size_) {
utils::net::Message received_message;
if (!client_->tryDequeue(received_message))
if (const auto received_message = client_->tryDequeue()) {
transferAsFlowFile(received_message.value(), session);
++logs_processed;
} else {
break;
transferAsFlowFile(received_message, session);
++logs_processed;
}
}
}

Expand Down Expand Up @@ -175,8 +176,8 @@ bool GetTCP::TcpClient::queueEmpty() const {
return concurrent_queue_.empty();
}

bool GetTCP::TcpClient::tryDequeue(utils::net::Message& received_message) {
return concurrent_queue_.tryDequeue(received_message);
std::optional<utils::net::Message> GetTCP::TcpClient::tryDequeue() {
return concurrent_queue_.tryDequeue();
}

asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) {
Expand All @@ -203,7 +204,10 @@ asio::awaitable<std::error_code> GetTCP::TcpClient::readLoop(auto& socket) {
continue;

if (!max_queue_size_ || max_queue_size_ > concurrent_queue_.size()) {
utils::net::Message message{read_message.substr(0, bytes_read), utils::net::IpProtocol::TCP, socket.lowest_layer().remote_endpoint().address(), socket.lowest_layer().remote_endpoint().port()};
const auto remote_endpoint = socket.lowest_layer().remote_endpoint();
const auto local_endpoint = socket.lowest_layer().local_endpoint();
utils::net::Message message{read_message.substr(0, bytes_read), utils::net::IpProtocol::TCP, remote_endpoint.address(),
remote_endpoint.port(), local_endpoint.port()};
if (previous_didnt_end_with_delimiter || current_doesnt_end_with_delimiter)
message.is_partial = true;
concurrent_queue_.enqueue(std::move(message));
Expand Down
7 changes: 5 additions & 2 deletions extensions/standard-processors/processors/GetTCP.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,17 @@ class GetTCP : public core::ProcessorImpl {
std::optional<size_t> max_message_size,
std::vector<utils::net::ConnectionId> connections,
std::shared_ptr<core::logging::Logger> logger);

TcpClient(const TcpClient&) = delete;
TcpClient(TcpClient&&) = delete;
TcpClient& operator=(const TcpClient&) = delete;
TcpClient& operator=(TcpClient&&) = delete;
~TcpClient();

void run();
void stop();

bool queueEmpty() const;
bool tryDequeue(utils::net::Message& received_message);
std::optional<utils::net::Message> tryDequeue();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional return values are cleaner than status bool return + conditionally assigned out param, plus it allows us to remove the default empty state from the possible states of the class, simplifying code.


private:
asio::awaitable<void> doReceiveFrom(const utils::net::ConnectionId& connection_id);
Expand Down
4 changes: 2 additions & 2 deletions extensions/standard-processors/processors/ListenSyslog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ void ListenSyslog::transferAsFlowFile(const utils::net::Message& message, core::

session.writeBuffer(flow_file, message.message_data);
flow_file->setAttribute("syslog.protocol", std::string{magic_enum::enum_name(message.protocol)});
flow_file->setAttribute("syslog.port", std::to_string(message.server_port));
flow_file->setAttribute("syslog.sender", message.sender_address.to_string());
flow_file->setAttribute("syslog.port", std::to_string(message.local_port));
flow_file->setAttribute("syslog.sender", message.remote_address.to_string());
session.transfer(flow_file, valid ? Success : Invalid);
}

Expand Down
4 changes: 2 additions & 2 deletions extensions/standard-processors/processors/ListenTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ void ListenTCP::onSchedule(core::ProcessContext& context, core::ProcessSessionFa
void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
auto flow_file = session.create();
session.writeBuffer(flow_file, message.message_data);
flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
flow_file->setAttribute("tcp.port", std::to_string(message.local_port));
flow_file->setAttribute("tcp.sender", message.remote_address.to_string());
session.transfer(flow_file, Success);
}

Expand Down
5 changes: 3 additions & 2 deletions extensions/standard-processors/processors/ListenUDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ void ListenUDP::onSchedule(core::ProcessContext& context, core::ProcessSessionFa
void ListenUDP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
auto flow_file = session.create();
session.writeBuffer(flow_file, message.message_data);
flow_file->setAttribute("udp.port", std::to_string(message.server_port));
flow_file->setAttribute("udp.sender", message.sender_address.to_string());
flow_file->setAttribute(ListeningPort.name, std::to_string(message.local_port));
flow_file->setAttribute(SenderPort.name, std::to_string(message.remote_port));
flow_file->setAttribute(Sender.name, message.remote_address.to_string());
session.transfer(flow_file, Success);
}

Expand Down
5 changes: 3 additions & 2 deletions extensions/standard-processors/processors/ListenUDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ class ListenUDP : public NetworkListenerProcessor {
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Messages received successfully will be sent out this relationship."};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success};

EXTENSIONAPI static constexpr auto PortOutputAttribute = core::OutputAttributeDefinition<0>{"udp.port", {}, "The sending port the messages were received."};
EXTENSIONAPI static constexpr auto ListeningPort = core::OutputAttributeDefinition<0>{"udp.port", {}, "The listening port on which the messages were received."};
EXTENSIONAPI static constexpr auto Sender = core::OutputAttributeDefinition<0>{"udp.sender", {}, "The sending host of the messages."};
EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{PortOutputAttribute, Sender};
EXTENSIONAPI static constexpr auto SenderPort = core::OutputAttributeDefinition<0>{"udp.sender.port", {}, "The sending port of the messages."};
EXTENSIONAPI static constexpr auto OutputAttributes = std::to_array<core::OutputAttributeReference>({ListeningPort, Sender, SenderPort});

void initialize() override;
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ void NetworkListenerProcessor::onTrigger(core::ProcessContext&, core::ProcessSes
gsl_Expects(max_batch_size_ > 0);
size_t logs_processed = 0;
while (!server_->queueEmpty() && logs_processed < max_batch_size_) {
utils::net::Message received_message;
if (!server_->tryDequeue(received_message))
if (const auto received_message = server_->tryDequeue()) {
transferAsFlowFile(received_message.value(), session);
++logs_processed;
} else {
break;
transferAsFlowFile(received_message, session);
++logs_processed;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@

#include <memory>
#include <string>
#include <utility>
#include <thread>

#include "core/ProcessorImpl.h"
#include "minifi-cpp/core/logging/Logger.h"
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "minifi-cpp/core/Property.h"
#include "utils/net/Server.h"

namespace org::apache::nifi::minifi::processors {

class NetworkListenerProcessor : public core::ProcessorImpl {
public:
using ProcessorImpl::ProcessorImpl;
NetworkListenerProcessor(const NetworkListenerProcessor&) = delete;
NetworkListenerProcessor(NetworkListenerProcessor&&) = delete;
NetworkListenerProcessor& operator=(const NetworkListenerProcessor&) = delete;
NetworkListenerProcessor& operator=(NetworkListenerProcessor&&) = delete;
~NetworkListenerProcessor() override;

void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
Expand Down
Loading
Loading