diff --git a/PROCESSORS.md b/PROCESSORS.md
index 2ce4022ecb..1fc8af2383 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -392,7 +392,7 @@ In the list below, the names of required properties appear in bold. Any other pr
### Description
-This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. The the payload of the MQTT message becomes content of a FlowFile
+This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. The the payload of the MQTT message becomes content of a FlowFile. If Record Reader and Record Writer are set, then the MQTT message specific attributes are not set in the flow file, because different attributes can be set for different records. In this case if Add Attributes As Fields is set to true, the attributes will be added to each record as fields.
### Properties
@@ -411,6 +411,9 @@ In the list below, the names of required properties appear in bold. Any other pr
| Attribute From Content Type | | | Name of FlowFile attribute to be filled from content type of received message. MQTT 5.x only. |
| Topic Alias Maximum | 0 | | Maximum number of topic aliases to use. If set to 0, then topic aliases cannot be used. MQTT 5.x only. |
| Receive Maximum | 65535 | | Maximum number of unacknowledged messages allowed. MQTT 5.x only. |
+| Record Reader | | | The Record Reader to use for parsing received MQTT Messages into Records. |
+| Record Writer | | | The Record Writer to use for serializing Records before writing them to a FlowFile. |
+| Add Attributes As Fields | true | true
false | If setting this property to true, default fields are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained. |
| **Quality of Service** | 0 | 0
1
2 | The Quality of Service (QoS) of messages. |
| Connection Timeout | 10 sec | | Maximum time interval the client will wait for the network connection to the MQTT broker |
| Keep Alive Interval | 60 sec | | Defines the maximum time interval between messages sent or received |
@@ -435,10 +438,15 @@ In the list below, the names of required properties appear in bold. Any other pr
### Output Attributes
-| Attribute | Relationship | Description |
-|-------------|--------------|---------------------------|
-| mqtt.broker | | URI of the sending broker |
-| mqtt.topic | | Topic of the message |
+| Attribute | Relationship | Description |
+|----------------------|--------------|---------------------------------------------------------------------------------------------------------------------------------------|
+| mqtt.broker | | URI of the sending broker |
+| mqtt.topic | | Topic of the message |
+| mqtt.topic.segment.n | | The nth topic segment of the message |
+| mqtt.qos | | The quality of service for this message. |
+| mqtt.isDuplicate | | Whether or not this message might be a duplicate of one which has already been received. |
+| mqtt.isRetained | | Whether or not this message was from a current publisher, or was "retained" by the server as the last message published on the topic. |
+| record.count | | The number of records received |
## ConsumeWindowsEventLog
@@ -2198,6 +2206,8 @@ In the list below, the names of required properties appear in bold. Any other pr
| Retain | false | true
false | Retain published message in broker |
| Message Expiry Interval | | | Time while message is valid and will be forwarded by broker. MQTT 5.x only. |
| Content Type | | | Content type of the message. MQTT 5.x only.
**Supports Expression Language: true** |
+| Record Reader | | | The Record Reader to use for parsing the incoming FlowFile into Records. |
+| Record Writer | | | The Record Writer to use for serializing Records before publishing them as an MQTT Message. |
| **Quality of Service** | 0 | 0
1
2 | The Quality of Service (QoS) of messages. |
| Connection Timeout | 10 sec | | Maximum time interval the client will wait for the network connection to the MQTT broker |
| Keep Alive Interval | 60 sec | | Defines the maximum time interval between messages sent or received |
diff --git a/docker/requirements.txt b/docker/requirements.txt
index 0fcf30a8fc..2a2da597a8 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -11,3 +11,4 @@ prometheus-api-client==0.5.5
humanfriendly==10.0
requests<2.29 # https://github.com/docker/docker-py/issues/3113
couchbase==4.3.5
+paho-mqtt==2.1.0
diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py
index aac42d4aad..620f842677 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -37,6 +37,7 @@
from .checkers.GrafanaLokiChecker import GrafanaLokiChecker
from .checkers.ModbusChecker import ModbusChecker
from .checkers.CouchbaseChecker import CouchbaseChecker
+from .checkers.MqttHelper import MqttHelper
from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, retry_check
@@ -58,6 +59,7 @@ def __init__(self, context, feature_id):
self.modbus_checker = ModbusChecker(self.container_communicator)
self.couchbase_checker = CouchbaseChecker()
self.kafka_checker = KafkaHelper(self.container_communicator, feature_id)
+ self.mqtt_helper = MqttHelper()
def cleanup(self):
self.container_store.cleanup()
@@ -457,3 +459,6 @@ def enable_ssl_in_nifi(self):
def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
return self.couchbase_checker.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)
+
+ def publish_test_mqtt_message(self, topic: str, message: str):
+ self.mqtt_helper.publish_test_mqtt_message(topic, message)
diff --git a/docker/test/integration/cluster/checkers/MqttHelper.py b/docker/test/integration/cluster/checkers/MqttHelper.py
new file mode 100644
index 0000000000..719911c970
--- /dev/null
+++ b/docker/test/integration/cluster/checkers/MqttHelper.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import paho.mqtt.client as mqtt
+
+
+class MqttHelper:
+ def publish_test_mqtt_message(self, topic: str, message: str):
+ client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "docker_test_client_id")
+ client.connect("localhost", 1883, 60)
+ client.publish(topic, message)
+ client.disconnect()
diff --git a/docker/test/integration/cluster/containers/FlowContainer.py b/docker/test/integration/cluster/containers/FlowContainer.py
index 5b5dabe718..b3ac65c68b 100644
--- a/docker/test/integration/cluster/containers/FlowContainer.py
+++ b/docker/test/integration/cluster/containers/FlowContainer.py
@@ -47,6 +47,12 @@ def add_start_node(self, node):
def add_controller(self, controller):
self.controllers.append(controller)
+ def get_controller(self, name):
+ for controller in self.controllers:
+ if controller.name == name:
+ return controller
+ raise ValueError(f"Controller with name '{name}' not found")
+
def add_parameter_to_flow_config(self, parameter_context_name, parameter_name, parameter_value):
if parameter_context_name in self.parameter_contexts:
self.parameter_contexts[parameter_context_name].append(Parameter(parameter_name, parameter_value))
diff --git a/docker/test/integration/cluster/containers/MqttBrokerContainer.py b/docker/test/integration/cluster/containers/MqttBrokerContainer.py
index b33c41b7e1..faa168f311 100644
--- a/docker/test/integration/cluster/containers/MqttBrokerContainer.py
+++ b/docker/test/integration/cluster/containers/MqttBrokerContainer.py
@@ -34,6 +34,7 @@ def deploy(self):
self.image_store.get_image(self.get_engine()),
detach=True,
name=self.name,
+ ports={'1883/tcp': 1883},
network=self.network.name,
entrypoint=self.command)
logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index 455a8b5ce8..e74e1772fd 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -517,3 +517,6 @@ def enable_ssl_in_nifi(self):
def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)
+
+ def publish_test_mqtt_message(self, topic, message):
+ self.cluster.publish_test_mqtt_message(topic, message)
diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature
index 17300956ec..64ef62f6b4 100644
--- a/docker/test/integration/features/mqtt.feature
+++ b/docker/test/integration/features/mqtt.feature
@@ -90,8 +90,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
And the "MQTT Version" property of the ConsumeMQTT processor is set to ""
And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And a LogAttribute processor
And "ConsumeMQTT" processor is a start node
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+ And the "success" relationship of the PutFile processor is connected to the LogAttribute
And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
@@ -101,6 +103,12 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And a file with the content "test" is placed in "/tmp/input"
And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
+ And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 60 seconds
+ And the Minifi logs contain the following message: "key:mqtt.topic value:testtopic" in less than 1 seconds
+ And the Minifi logs contain the following message: "key:mqtt.topic.segment.0 value:testtopic" in less than 1 seconds
+ And the Minifi logs contain the following message: "key:mqtt.qos value:0" in less than 1 seconds
+ And the Minifi logs contain the following message: "key:mqtt.isDuplicate value:false" in less than 1 seconds
+ And the Minifi logs contain the following message: "key:mqtt.isRetained value:false" in less than 1 seconds
Examples: MQTT versions
| version |
@@ -505,3 +513,48 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And "publisher-client" flow is killed
And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client"
And a flowfile with the content "last_will_message" is placed in the monitored directory in less than 60 seconds
+
+ Scenario: A MiNiFi instance uses record reader and writer to convert consumed message from an MQTT broker
+ Given a XMLReader controller service is set up
+ And a JsonRecordSetWriter controller service is set up with "Array" output grouping
+ And a ConsumeMQTT processor with the "Topic" property set to "test/my/topic"
+ And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.x AUTO"
+ And the "Record Reader" property of the ConsumeMQTT processor is set to "XMLReader"
+ And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter"
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And a LogAttribute processor
+ And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
+ And the "success" relationship of the PutFile processor is connected to the LogAttribute
+ And an MQTT broker is set up in correspondence with the ConsumeMQTT
+
+ When both instances start up
+ And a test message "test" is published to the MQTT broker on topic "test/my/topic"
+
+ Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
+ And a flowfile with the JSON content '[{"_isRetained": false, "_isDuplicate": false, "_qos": 0, "_topicSegments": ["test", "my", "topic"], "_topic": "test/my/topic", "element": "test"}]' is placed in the monitored directory in less than 60 seconds
+ And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
+ And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds
+
+ Scenario: A MiNiFi instance uses record reader and writer to convert and publish records to an MQTT broker
+ Given a JsonTreeReader controller service is set up
+ And a XMLRecordSetWriter controller service is set up
+ And the "Name of Record Tag" property of the XMLRecordSetWriter controller is set to "record"
+ And the "Name of Root Tag" property of the XMLRecordSetWriter controller is set to "root"
+ And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+ And a file with the content '[{"string": "test"}, {"int": 42}]' is present in '/tmp/input'
+ And a PublishMQTT processor set up to communicate with an MQTT broker instance
+ And the "MQTT Version" property of the PublishMQTT processor is set to "3.x AUTO"
+ And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader"
+ And the "Record Writer" property of the PublishMQTT processor is set to "XMLRecordSetWriter"
+ And a UpdateAttribute processor with the "filename" property set to "${UUID()}.xml"
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And the "success" relationship of the GetFile processor is connected to the PublishMQTT
+ And the "success" relationship of the PublishMQTT processor is connected to the UpdateAttribute
+ And the "success" relationship of the UpdateAttribute processor is connected to the PutFile
+ And an MQTT broker is set up in correspondence with the PublishMQTT
+
+ When both instances start up
+
+ Then two flowfiles with the contents 'test' and '42' are placed in the monitored directory in less than 60 seconds
+ And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(72 bytes\)"
+ And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(64 bytes\)"
diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py
index 129d8df57d..590fa06d0b 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -25,6 +25,8 @@
from minifi.controllers.KubernetesControllerService import KubernetesControllerService
from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter
from minifi.controllers.JsonTreeReader import JsonTreeReader
+from minifi.controllers.XMLReader import XMLReader
+from minifi.controllers.XMLRecordSetWriter import XMLRecordSetWriter
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
from minifi.controllers.XMLReader import XMLReader
@@ -182,6 +184,12 @@ def step_impl(context, property_name, processor_name, property_value):
processor.set_property(property_name, property_value)
+@given("the \"{property_name}\" property of the {controller_name} controller is set to \"{property_value}\"")
+def step_impl(context, property_name, controller_name, property_value):
+ container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
+ container.get_controller(controller_name).set_property(property_name, property_value)
+
+
@given("the \"{property_name}\" properties of the {processor_name_one} and {processor_name_two} processors are set to the same random guid")
def step_impl(context, property_name, processor_name_one, processor_name_two):
uuid_str = str(uuid.uuid4())
@@ -430,20 +438,30 @@ def step_impl(context, processor_name):
# Record set reader and writer
-@given("a JsonRecordSetWriter controller service is set up with \"{}\" output grouping")
-def step_impl(context, output_grouping: str):
+@given("a JsonRecordSetWriter controller service is set up with \"{}\" output grouping in the \"{minifi_container_name}\" flow")
+def step_impl(context, output_grouping: str, minifi_container_name: str):
json_record_set_writer = JsonRecordSetWriter(name="JsonRecordSetWriter", output_grouping=output_grouping)
- container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
+ container = context.test.acquire_container(context=context, name=minifi_container_name)
container.add_controller(json_record_set_writer)
-@given("a JsonTreeReader controller service is set up")
-def step_impl(context):
+@given("a JsonTreeReader controller service is set up in the \"{minifi_container_name}\" flow")
+def step_impl(context, minifi_container_name: str):
json_record_set_reader = JsonTreeReader("JsonTreeReader")
- container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
+ container = context.test.acquire_container(context=context, name=minifi_container_name)
container.add_controller(json_record_set_reader)
+@given("a JsonRecordSetWriter controller service is set up with \"{}\" output grouping")
+def step_impl(context, output_grouping: str):
+ context.execute_steps(f"given a JsonRecordSetWriter controller service is set up with \"{output_grouping}\" output grouping in the \"minifi-cpp-flow\" flow")
+
+
+@given("a JsonTreeReader controller service is set up")
+def step_impl(context):
+ context.execute_steps("given a JsonTreeReader controller service is set up in the \"minifi-cpp-flow\" flow")
+
+
@given("a XMLReader controller service is set up")
def step_impl(context):
xml_reader = XMLReader("XMLReader")
@@ -451,6 +469,13 @@ def step_impl(context):
container.add_controller(xml_reader)
+@given("a XMLRecordSetWriter controller service is set up")
+def step_impl(context):
+ xml_record_set_writer = XMLRecordSetWriter("XMLRecordSetWriter")
+ container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
+ container.add_controller(xml_record_set_writer)
+
+
# Kubernetes
def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties):
kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties)
@@ -775,6 +800,7 @@ def step_impl(context, content, duration):
@then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are placed in the monitored directory in less than {duration}")
+@then("two flowfiles with the contents '{content_1}' and '{content_2}' are placed in the monitored directory in less than {duration}")
def step_impl(context, content_1, content_2, duration):
context.test.check_for_multiple_files_generated(2, humanfriendly.parse_timespan(duration), [content_1, content_2])
@@ -933,6 +959,11 @@ def step_impl(context, log_count, log_pattern):
context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 60, count=int(log_count))
+@when("a test message \"{message}\" is published to the MQTT broker on topic \"{topic}\"")
+def step_impl(context, message, topic):
+ context.test.publish_test_mqtt_message(topic, message)
+
+
@then("the \"{minifi_container_name}\" flow has a log line matching \"{log_pattern}\" in less than {duration}")
def step_impl(context, minifi_container_name, log_pattern, duration):
context.test.check_container_log_matches_regex(minifi_container_name, log_pattern, humanfriendly.parse_timespan(duration), count=1)
diff --git a/docker/test/integration/minifi/controllers/XMLRecordSetWriter.py b/docker/test/integration/minifi/controllers/XMLRecordSetWriter.py
new file mode 100644
index 0000000000..82c2df6f3b
--- /dev/null
+++ b/docker/test/integration/minifi/controllers/XMLRecordSetWriter.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.ControllerService import ControllerService
+
+
+class XMLRecordSetWriter(ControllerService):
+ def __init__(self, name=None):
+ super(XMLRecordSetWriter, self).__init__(name=name)
+ self.service_class = 'XMLRecordSetWriter'
diff --git a/docker/test/integration/minifi/core/ControllerService.py b/docker/test/integration/minifi/core/ControllerService.py
index 02d32e4a77..bc26c27bce 100644
--- a/docker/test/integration/minifi/core/ControllerService.py
+++ b/docker/test/integration/minifi/core/ControllerService.py
@@ -35,3 +35,6 @@ def __init__(self, name=None, properties=None):
self.properties = properties
self.linked_services = []
+
+ def set_property(self, name, value):
+ self.properties[name] = value
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
index 3fc5e9f664..4d928fafe7 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.cpp
@@ -109,6 +109,20 @@ void AbstractMQTTProcessor::onSchedule(core::ProcessContext& context, core::Proc
readProperties(context);
checkProperties(context);
initializeClient();
+
+ auto record_set_reader = utils::parseOptionalControllerService(context, RecordReader, getUUID());
+ auto record_set_writer = utils::parseOptionalControllerService(context, RecordWriter, getUUID());
+
+ if ((record_set_reader == nullptr) != (record_set_writer == nullptr)) {
+ throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT processor requires both or neither Record Reader and Record Writer to be set");
+ }
+
+ if (record_set_reader) {
+ record_converter_ = core::RecordConverter{
+ .record_set_reader = gsl::make_not_null(std::move(record_set_reader)),
+ .record_set_writer = gsl::make_not_null(std::move(record_set_writer)),
+ };
+ }
}
void AbstractMQTTProcessor::initializeClient() {
diff --git a/extensions/mqtt/processors/AbstractMQTTProcessor.h b/extensions/mqtt/processors/AbstractMQTTProcessor.h
index 303c0f492f..5b7a8def6b 100644
--- a/extensions/mqtt/processors/AbstractMQTTProcessor.h
+++ b/extensions/mqtt/processors/AbstractMQTTProcessor.h
@@ -23,6 +23,7 @@
#include
#include
#include
+#include
#include "minifi-cpp/core/PropertyDefinition.h"
#include "core/ProcessorImpl.h"
@@ -32,6 +33,7 @@
#include "core/logging/LoggerFactory.h"
#include "utils/Enum.h"
#include "MQTTAsync.h"
+#include "minifi-cpp/controllers/RecordConverter.h"
namespace org::apache::nifi::minifi::processors::mqtt {
enum class MqttVersions {
@@ -166,6 +168,14 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
.withDescription("Private key passphrase")
.isSensitive(true)
.build();
+ EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
+ .withDescription("The Record Reader to use for parsing received MQTT Messages into Records.")
+ .withAllowedTypes()
+ .build();
+ EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
+ .withDescription("The Record Writer to use for serializing Records before writing them to a FlowFile.")
+ .withAllowedTypes()
+ .build();
EXTENSIONAPI static constexpr auto BasicProperties = std::to_array({
BrokerURI,
ClientID,
@@ -186,7 +196,9 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
SecurityCA,
SecurityCert,
SecurityPrivateKey,
- SecurityPrivateKeyPassword
+ SecurityPrivateKeyPassword,
+ RecordReader,
+ RecordWriter
});
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) override;
@@ -256,13 +268,15 @@ class AbstractMQTTProcessor : public core::ProcessorImpl {
std::optional maximum_session_expiry_interval_;
std::optional server_keep_alive_;
+ std::optional record_converter_;
+
private:
using ConnectFinishedTask = std::packaged_task;
/**
* Initializes local MQTT client and connects to broker.
*/
- void initializeClient();
+ virtual void initializeClient();
/**
* Calls disconnect() and releases local MQTT client
diff --git a/extensions/mqtt/processors/ConsumeMQTT.cpp b/extensions/mqtt/processors/ConsumeMQTT.cpp
index dd66c89138..5b5405a0a9 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.cpp
+++ b/extensions/mqtt/processors/ConsumeMQTT.cpp
@@ -16,7 +16,6 @@
*/
#include "ConsumeMQTT.h"
-
#include
#include
#include
@@ -26,9 +25,10 @@
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
+#include "io/BufferStream.h"
+#include "utils/ProcessorConfigUtils.h"
#include "utils/StringUtils.h"
#include "utils/ValueParser.h"
-#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
@@ -37,6 +37,12 @@ void ConsumeMQTT::initialize() {
setSupportedRelationships(Relationships);
}
+void ConsumeMQTT::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) {
+ AbstractMQTTProcessor::onSchedule(context, factory);
+
+ add_attributes_as_fields_ = utils::parseBoolProperty(context, AddAttributesAsFields);
+}
+
void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
if (queue_.size_approx() >= max_queue_size_) {
logger_->log_error("MQTT queue full");
@@ -58,8 +64,57 @@ void ConsumeMQTT::readProperties(core::ProcessContext& context) {
receive_maximum_ = gsl::narrow(utils::parseU64Property(context, ReceiveMaximum));
}
-void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& session) {
- std::queue msg_queue = getReceivedMqttMessages();
+void ConsumeMQTT::addAttributesAsRecordFields(core::RecordSet& new_records, const SmartMessage& message) const {
+ if (!add_attributes_as_fields_) {
+ return;
+ }
+
+ for (auto& record : new_records) {
+ record.emplace("_topic", core::RecordField(message.topic));
+ auto topic_segments = utils::string::split(message.topic, "/");
+ core::RecordArray topic_segments_array;
+ for (const auto& topic_segment : topic_segments) {
+ topic_segments_array.emplace_back(core::RecordField(topic_segment));
+ }
+ record.emplace("_topicSegments", core::RecordField(std::move(topic_segments_array)));
+ record.emplace("_qos", core::RecordField(message.contents->qos));
+ record.emplace("_isDuplicate", core::RecordField(message.contents->dup > 0));
+ record.emplace("_isRetained", core::RecordField(message.contents->retained > 0));
+ }
+}
+
+void ConsumeMQTT::transferMessagesAsRecords(core::ProcessSession& session) {
+ gsl_Expects(record_converter_);
+ auto msg_queue = getReceivedMqttMessages();
+ core::RecordSet record_set;
+ while (!msg_queue.empty()) {
+ io::BufferStream buffer_stream;
+ buffer_stream.write(reinterpret_cast(msg_queue.front().contents->payload), gsl::narrow(msg_queue.front().contents->payloadlen));
+ auto new_records_result = record_converter_->record_set_reader->read(buffer_stream);
+ if (!new_records_result) {
+ logger_->log_error("Failed to read records from MQTT message: {}", new_records_result.error());
+ msg_queue.pop();
+ continue;
+ }
+ auto& new_records = new_records_result.value();
+ addAttributesAsRecordFields(new_records, msg_queue.front());
+ record_set.reserve(record_set.size() + new_records.size());
+ record_set.insert(record_set.end(), std::make_move_iterator(new_records.begin()), std::make_move_iterator(new_records.end()));
+ msg_queue.pop();
+ }
+ if (record_set.empty()) {
+ logger_->log_debug("No records to write, skipping FlowFile creation");
+ return;
+ }
+ std::shared_ptr flow_file = session.create();
+ record_converter_->record_set_writer->write(record_set, flow_file, session);
+ session.putAttribute(*flow_file, RecordCountOutputAttribute.name, std::to_string(record_set.size()));
+ session.putAttribute(*flow_file, BrokerOutputAttribute.name, uri_);
+ session.transfer(flow_file, Success);
+}
+
+void ConsumeMQTT::transferMessagesAsFlowFiles(core::ProcessSession& session) {
+ auto msg_queue = getReceivedMqttMessages();
while (!msg_queue.empty()) {
const auto& message = msg_queue.front();
std::shared_ptr flow_file = session.create();
@@ -76,6 +131,13 @@ void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& ses
putUserPropertiesAsAttributes(message, flow_file, session);
session.putAttribute(*flow_file, BrokerOutputAttribute.name, uri_);
session.putAttribute(*flow_file, TopicOutputAttribute.name, message.topic);
+ auto topic_segments = utils::string::split(message.topic, "/");
+ for (size_t i = 0; i < topic_segments.size(); ++i) {
+ session.putAttribute(*flow_file, "mqtt.topic.segment." + std::to_string(i), topic_segments[i]);
+ }
+ session.putAttribute(*flow_file, QosOutputAttribute.name, std::to_string(message.contents->qos));
+ session.putAttribute(*flow_file, IsDuplicateOutputAttribute.name, message.contents->dup > 0 ? "true" : "false");
+ session.putAttribute(*flow_file, IsRetainedOutputAttribute.name, message.contents->retained > 0 ? "true" : "false");
fillAttributeFromContentType(message, flow_file, session);
logger_->log_debug("ConsumeMQTT processing success for the flow with UUID {} topic {}", flow_file->getUUIDStr(), message.topic);
session.transfer(flow_file, Success);
@@ -84,6 +146,14 @@ void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& ses
}
}
+void ConsumeMQTT::onTriggerImpl(core::ProcessContext&, core::ProcessSession& session) {
+ if (record_converter_) {
+ transferMessagesAsRecords(session);
+ } else {
+ transferMessagesAsFlowFiles(session);
+ }
+}
+
std::queue ConsumeMQTT::getReceivedMqttMessages() {
std::queue msg_queue;
SmartMessage message;
diff --git a/extensions/mqtt/processors/ConsumeMQTT.h b/extensions/mqtt/processors/ConsumeMQTT.h
index 384fdeed3e..b4434d46ea 100644
--- a/extensions/mqtt/processors/ConsumeMQTT.h
+++ b/extensions/mqtt/processors/ConsumeMQTT.h
@@ -42,7 +42,8 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
using AbstractMQTTProcessor::AbstractMQTTProcessor;
EXTENSIONAPI static constexpr const char* Description = "This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. "
- "The the payload of the MQTT message becomes content of a FlowFile";
+ "The the payload of the MQTT message becomes content of a FlowFile. If Record Reader and Record Writer are set, then the MQTT message specific attributes are not set in the flow file, "
+ "because different attributes can be set for different records. In this case if Add Attributes As Fields is set to true, the attributes will be added to each record as fields.";
EXTENSIONAPI static constexpr auto Topic = core::PropertyDefinitionBuilder<>::createProperty("Topic")
.withDescription("The topic to subscribe to.")
@@ -81,6 +82,11 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
.withDefaultValue(MQTT_MAX_RECEIVE_MAXIMUM_STR)
.build();
+ EXTENSIONAPI static constexpr auto AddAttributesAsFields = core::PropertyDefinitionBuilder<>::createProperty("Add Attributes As Fields")
+ .withDescription("If setting this property to true, default fields are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.")
+ .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
+ .withDefaultValue("true")
+ .build();
EXTENSIONAPI static constexpr auto Properties = utils::array_cat(AbstractMQTTProcessor::BasicProperties, std::to_array({
Topic,
CleanSession,
@@ -89,7 +95,8 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
QueueBufferMaxMessage,
AttributeFromContentType,
TopicAliasMaximum,
- ReceiveMaximum
+ ReceiveMaximum,
+ AddAttributesAsFields
}), AbstractMQTTProcessor::AdvancedProperties);
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"};
@@ -97,7 +104,15 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
EXTENSIONAPI static constexpr auto BrokerOutputAttribute = core::OutputAttributeDefinition<0>{"mqtt.broker", {}, "URI of the sending broker"};
EXTENSIONAPI static constexpr auto TopicOutputAttribute = core::OutputAttributeDefinition<0>{"mqtt.topic", {}, "Topic of the message"};
- EXTENSIONAPI static constexpr auto OutputAttributes = std::array{BrokerOutputAttribute, TopicOutputAttribute};
+ EXTENSIONAPI static constexpr auto TopicSegmentOutputAttribute = core::OutputAttributeDefinition<0>{"mqtt.topic.segment.n", {}, "The nth topic segment of the message"};
+ EXTENSIONAPI static constexpr auto QosOutputAttribute = core::OutputAttributeDefinition<0>{"mqtt.qos", {}, "The quality of service for this message."};
+ EXTENSIONAPI static constexpr auto IsDuplicateOutputAttribute = core::OutputAttributeDefinition<0>{"mqtt.isDuplicate", {},
+ "Whether or not this message might be a duplicate of one which has already been received."};
+ EXTENSIONAPI static constexpr auto IsRetainedOutputAttribute = core::OutputAttributeDefinition<0>{"mqtt.isRetained", {},
+ "Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published on the topic."};
+ EXTENSIONAPI static constexpr auto RecordCountOutputAttribute = core::OutputAttributeDefinition<0>{"record.count", {}, "The number of records received"};
+ EXTENSIONAPI static constexpr auto OutputAttributes = std::to_array({BrokerOutputAttribute, TopicOutputAttribute, TopicSegmentOutputAttribute,
+ QosOutputAttribute, IsDuplicateOutputAttribute, IsRetainedOutputAttribute, RecordCountOutputAttribute});
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
@@ -109,6 +124,15 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
void readProperties(core::ProcessContext& context) override;
void onTriggerImpl(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;
+ void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) override;
+
+ protected:
+ /**
+ * Enqueues received MQTT message into internal message queue.
+ * Called as a callback on a separate thread than onTrigger, as a reaction to message incoming.
+ * @param message message to put to queue
+ */
+ void enqueueReceivedMQTTMsg(SmartMessage message);
private:
class WriteCallback {
@@ -142,13 +166,6 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
void onSubscriptionFailure5(MQTTAsync_failureData5* response);
void onMessageReceived(SmartMessage smart_message) override;
- /**
- * Enqueues received MQTT message into internal message queue.
- * Called as a callback on a separate thread than onTrigger, as a reaction to message incoming.
- * @param message message to put to queue
- */
- void enqueueReceivedMQTTMsg(SmartMessage message);
-
/**
* Called in onTrigger to return the whole internal message queue
* @return message queue of messages received since previous onTrigger
@@ -193,6 +210,10 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
void setProcessorSpecificMqtt5ConnectOptions(MQTTProperties& connect_props) const override;
+ void transferMessagesAsRecords(core::ProcessSession& session);
+ void addAttributesAsRecordFields(core::RecordSet& new_records, const SmartMessage& message) const;
+ void transferMessagesAsFlowFiles(core::ProcessSession& session);
+
std::string topic_;
bool clean_session_ = true;
bool clean_start_ = true;
@@ -205,6 +226,7 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor {
std::unordered_map alias_to_topic_;
moodycamel::ConcurrentQueue queue_;
+ bool add_attributes_as_fields_ = true;
};
} // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/mqtt/processors/PublishMQTT.cpp b/extensions/mqtt/processors/PublishMQTT.cpp
index 23a8af2517..1952dc057f 100644
--- a/extensions/mqtt/processors/PublishMQTT.cpp
+++ b/extensions/mqtt/processors/PublishMQTT.cpp
@@ -56,29 +56,62 @@ void PublishMQTT::readProperties(core::ProcessContext& context) {
}
void PublishMQTT::onTriggerImpl(core::ProcessContext& context, core::ProcessSession& session) {
- std::shared_ptr flow_file = session.get();
+ std::shared_ptr original_flow_file = session.get();
- if (!flow_file) {
+ if (!original_flow_file) {
context.yield();
return;
}
+ std::vector> flow_files;
+ if (record_converter_) {
+ nonstd::expected record_set;
+ session.read(original_flow_file, [this, &record_set](const std::shared_ptr& input_stream) {
+ record_set = record_converter_->record_set_reader->read(*input_stream);
+ return gsl::narrow(input_stream->size());
+ });
+
+ if (!record_set) {
+ logger_->log_error("Failed to read FlowFile [{}] as RecordSet, error: {}", original_flow_file->getUUIDStr(), record_set.error().message());
+ session.transfer(original_flow_file, Failure);
+ return;
+ }
+
+ for (auto&& record : *record_set) {
+ auto new_flow_file = session.create(original_flow_file.get());
+ if (!new_flow_file) {
+ logger_->log_error("Failed to create new FlowFile from record");
+ continue;
+ }
+ std::vector records;
+ records.emplace_back(std::move(record));
+ record_converter_->record_set_writer->write(records, new_flow_file, session);
+ flow_files.push_back(std::move(new_flow_file));
+ }
+
+ session.remove(original_flow_file);
+ } else {
+ flow_files.push_back(original_flow_file);
+ }
+
// broker's Receive Maximum can change after reconnect
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
- const auto topic = getTopic(context, flow_file.get());
- try {
- const auto result = session.readBuffer(flow_file);
- if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file.get()), flow_file)) {
- logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' on broker {}", flow_file->getUUIDStr(), topic, uri_);
+ for (const auto& flow_file : flow_files) {
+ const auto topic = getTopic(context, flow_file.get());
+ try {
+ const auto result = session.readBuffer(flow_file);
+ if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file.get()), flow_file)) {
+ logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' on broker {}", flow_file->getUUIDStr(), topic, uri_);
+ session.transfer(flow_file, Failure);
+ return;
+ }
+ logger_->log_debug("Sent flow file [{}] with length {} to MQTT topic '{}' on broker {}", flow_file->getUUIDStr(), result.status, topic, uri_);
+ session.transfer(flow_file, Success);
+ } catch (const Exception& ex) {
+ logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' on broker {}, exception string: '{}'", flow_file->getUUIDStr(), topic, uri_, ex.what());
session.transfer(flow_file, Failure);
- return;
}
- logger_->log_debug("Sent flow file [{}] with length {} to MQTT topic '{}' on broker {}", flow_file->getUUIDStr(), result.status, topic, uri_);
- session.transfer(flow_file, Success);
- } catch (const Exception& ex) {
- logger_->log_error("Failed to send flow file [{}] to MQTT topic '{}' on broker {}, exception string: '{}'", flow_file->getUUIDStr(), topic, uri_, ex.what());
- session.transfer(flow_file, Failure);
}
}
diff --git a/extensions/mqtt/processors/PublishMQTT.h b/extensions/mqtt/processors/PublishMQTT.h
index c59c4bd3ff..4e0145486a 100644
--- a/extensions/mqtt/processors/PublishMQTT.h
+++ b/extensions/mqtt/processors/PublishMQTT.h
@@ -86,6 +86,9 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
void onTriggerImpl(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;
+ protected:
+ virtual bool sendMessage(const std::vector& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr& flow_file);
+
private:
/**
* Counts unacknowledged QoS 1 and QoS 2 messages to respect broker's Receive Maximum
@@ -134,16 +137,6 @@ class PublishMQTT : public processors::AbstractMQTTProcessor {
*/
std::string getContentType(core::ProcessContext& context, const core::FlowFile* const flow_file) const;
- /**
- * Sends an MQTT message asynchronously
- * @param buffer contents of the message
- * @param topic topic of the message
- * @param content_type Content Type for MQTT 5
- * @param flow_file Flow File being processed
- * @return success of message sending
- */
- bool sendMessage(const std::vector& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr& flow_file);
-
/**
* Callback for asynchronous message sending
* @param success if message sending was successful
diff --git a/extensions/mqtt/tests/ConsumeMQTTTests.cpp b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
index 6e22381c6c..8b02e2df19 100644
--- a/extensions/mqtt/tests/ConsumeMQTTTests.cpp
+++ b/extensions/mqtt/tests/ConsumeMQTTTests.cpp
@@ -20,139 +20,350 @@
#include "catch2/matchers/catch_matchers_string.hpp"
#include "unit/TestBase.h"
#include "../processors/ConsumeMQTT.h"
+#include "core/Resource.h"
+#include "unit/SingleProcessorTestController.h"
+#include "rapidjson/document.h"
+#include "unit/ProcessorUtils.h"
-namespace {
-struct Fixture {
- Fixture() {
- LogTestController::getInstance().setDebug();
- plan_ = testController_.createPlan();
- consumeMqttProcessor_ = plan_->addProcessor("ConsumeMQTT", "consumeMqttProcessor");
+namespace org::apache::nifi::minifi::test {
+void verifyXmlJsonResult(const std::string& json_content, size_t expected_record_count, bool add_attributes_as_fields) {
+ rapidjson::Document document;
+ document.Parse(json_content.c_str());
+ REQUIRE(document.IsArray());
+ REQUIRE(document.GetArray().Size() == expected_record_count);
+ for (size_t i = 0; i < expected_record_count; ++i) {
+ auto& current_record = document[gsl::narrow(i)];
+ REQUIRE(current_record.IsObject());
+ REQUIRE(current_record.HasMember("int_value"));
+ uint64_t int_result = current_record["int_value"].GetInt64();
+ CHECK(int_result == 42);
+ REQUIRE(current_record.HasMember("string_value"));
+ std::string string_result = current_record["string_value"].GetString();
+ CHECK(string_result == "test");
+
+ if (add_attributes_as_fields) {
+ string_result = current_record["_topic"].GetString();
+ CHECK(string_result == "mytopic/segment/" + std::to_string(i));
+ auto array = current_record["_topicSegments"].GetArray();
+ CHECK(array.Size() == 3);
+ string_result = array[0].GetString();
+ CHECK(string_result == "mytopic");
+ string_result = array[1].GetString();
+ CHECK(string_result == "segment");
+ string_result = array[2].GetString();
+ CHECK(string_result == std::to_string(i));
+ int_result = current_record["_qos"].GetInt64();
+ CHECK(int_result == i);
+ bool bool_result = current_record["_isDuplicate"].GetBool();
+ if (i == 0) {
+ CHECK_FALSE(bool_result);
+ } else {
+ CHECK(bool_result);
+ }
+ bool_result = current_record["_isRetained"].GetBool();
+ if (i == 0) {
+ CHECK_FALSE(bool_result);
+ } else {
+ CHECK(bool_result);
+ }
+ } else {
+ CHECK_FALSE(current_record.HasMember("_topic"));
+ CHECK_FALSE(current_record.HasMember("_qos"));
+ CHECK_FALSE(current_record.HasMember("_isDuplicate"));
+ CHECK_FALSE(current_record.HasMember("_isRetained"));
+ }
}
+}
- Fixture(Fixture&&) = delete;
- Fixture(const Fixture&) = delete;
- Fixture& operator=(Fixture&&) = delete;
- Fixture& operator=(const Fixture&) = delete;
+class TestConsumeMQTTProcessor : public minifi::processors::ConsumeMQTT {
+ public:
+ using SmartMessage = processors::AbstractMQTTProcessor::SmartMessage;
+ using MQTTMessageDeleter = processors::AbstractMQTTProcessor::MQTTMessageDeleter;
+ explicit TestConsumeMQTTProcessor(minifi::core::ProcessorMetadata metadata)
+ : minifi::processors::ConsumeMQTT(std::move(metadata)) {}
- ~Fixture() {
+ using ConsumeMQTT::enqueueReceivedMQTTMsg;
+
+ void initializeClient() override {
+ }
+
+ void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override {
+ minifi::processors::ConsumeMQTT::onTriggerImpl(context, session);
+ }
+};
+
+REGISTER_RESOURCE(TestConsumeMQTTProcessor, Processor);
+
+struct ConsumeMqttTestFixture {
+ ConsumeMqttTestFixture()
+ : test_controller_(utils::make_processor("TestConsumeMQTTProcessor")),
+ consume_mqtt_processor_(test_controller_.getProcessor()) {
+ REQUIRE(consume_mqtt_processor_ != nullptr);
+ LogTestController::getInstance().setDebug();
+ }
+
+ ConsumeMqttTestFixture(ConsumeMqttTestFixture&&) = delete;
+ ConsumeMqttTestFixture(const ConsumeMqttTestFixture&) = delete;
+ ConsumeMqttTestFixture& operator=(ConsumeMqttTestFixture&&) = delete;
+ ConsumeMqttTestFixture& operator=(const ConsumeMqttTestFixture&) = delete;
+
+ ~ConsumeMqttTestFixture() {
LogTestController::getInstance().reset();
}
- TestController testController_;
- std::shared_ptr plan_;
- core::Processor* consumeMqttProcessor_ = nullptr;
+ SingleProcessorTestController test_controller_;
+ core::Processor* consume_mqtt_processor_ = nullptr;
};
-} // namespace
using namespace std::literals::chrono_literals;
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyTopic", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_),
- Catch::Matchers::EndsWith("Expected valid value from \"consumeMqttProcessor::Topic\", but got PropertyNotSet (Property Error:2)"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_EmptyTopic", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_),
+ Catch::Matchers::EndsWith("Expected valid value from \"TestConsumeMQTTProcessor::Topic\", but got PropertyNotSet (Property Error:2)"));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_EmptyBrokerURI", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE_THROWS_WITH(plan_->scheduleProcessor(consumeMqttProcessor_),
- Catch::Matchers::EndsWith("Expected valid value from \"consumeMqttProcessor::Broker URI\", but got PropertyNotSet (Property Error:2)"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_EmptyBrokerURI", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_),
+ Catch::Matchers::EndsWith("Expected valid value from \"TestConsumeMQTTProcessor::Broker URI\", but got PropertyNotSet (Property Error:2)"));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithID", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name, "1"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession.name, "false"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_DurableSessionWithID", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::QoS.name, "1"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::CleanSession.name, "false"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE_FALSE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
- "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.", 1s));
+ "by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.", 0s));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithQoS0", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name, "0"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession.name, "false"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_DurableSessionWithQoS0", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::QoS.name, "0"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::CleanSession.name, "false"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
"by the broker when QoS is less than 1 for durable (non-clean) sessions. Only subscriptions are preserved.", 1s));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithID_V_5", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name, "1"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name, std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_DurableSessionWithID_V_5", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::QoS.name, "1"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
+ std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE_FALSE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
- "by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.", 1s));
+ "by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.", 0s));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_DurableSessionWithQoS0_V_5", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::QoS.name, "0"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name, std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_DurableSessionWithQoS0_V_5", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::ClientID.name, "subscriber"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::QoS.name, "0"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
+ std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] Messages are not preserved during client disconnection "
"by the broker when QoS is less than 1 for durable (Session Expiry Interval > 0) sessions. Only subscriptions are preserved.", 1s));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_CleanStart_V_3", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanStart.name, "true"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_CleanStart_V_3", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::CleanStart.name, "true"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Clean Start. Property is not used.", 1s));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_SessionExpiryInterval_V_3", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_SessionExpiryInterval_V_3", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "1 h"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Session Expiry Intervals. Property is not used.", 1s));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_CleanSession_V_5", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::MqttVersion.name, std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "0 s"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::CleanSession.name, "true"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_CleanSession_V_5", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::MqttVersion.name,
+ std::string{magic_enum::enum_name(minifi::processors::mqtt::MqttVersions::V_5_0)}));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::SessionExpiryInterval.name, "0 s"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::CleanSession.name, "true"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 5.0 specification does not support Clean Session. Property is not used.", 1s));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_TopicAliasMaximum_V_3", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::TopicAliasMaximum.name, "1"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_TopicAliasMaximum_V_3", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::TopicAliasMaximum.name, "1"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Topic Alias Maximum. Property is not used.", 1s));
}
-TEST_CASE_METHOD(Fixture, "ConsumeMQTTTest_ReceiveMaximum_V_3", "[consumeMQTTTest]") {
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
- REQUIRE(consumeMqttProcessor_->setProperty(minifi::processors::ConsumeMQTT::ReceiveMaximum.name, "1"));
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "ConsumeMQTTTest_ReceiveMaximum_V_3", "[consumeMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::ReceiveMaximum.name, "1"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(consumeMqttProcessor_));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(consume_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Receive Maximum. Property is not used.", 1s));
}
+
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read XML messages and write them to json records", "[consumeMQTTTest]") {
+ test_controller_.plan->addController("XMLReader", "XMLReader");
+ test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordReader.name, "XMLReader"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
+
+ bool add_attributes_as_fields = true;
+ SECTION("Add attributes as fields by default") {
+ }
+
+ SECTION("Do not add attributes as fields") {
+ add_attributes_as_fields = false;
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::AddAttributesAsFields.name, "false"));
+ }
+
+ const size_t expected_record_count = 2;
+ const std::string payload = R"(42test)";
+ for (size_t i = 0; i < expected_record_count; ++i) {
+ TestConsumeMQTTProcessor::SmartMessage message{std::unique_ptr(
+ new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = gsl::narrow(i), .payloadlen = gsl::narrow(payload.size()),
+ .payload = const_cast(payload.data()), .qos = gsl::narrow(i), .retained = gsl::narrow(i), .dup = gsl::narrow(i),
+ .msgid = gsl::narrow(i + 1), .properties = {}}),
+ std::string{"mytopic/segment/" + std::to_string(i)}}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
+
+ auto& test_processor = dynamic_cast(consume_mqtt_processor_->getImpl());
+ test_processor.enqueueReceivedMQTTMsg(std::move(message));
+ }
+ const auto trigger_results = test_controller_.trigger();
+ CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).size() == 1);
+ const auto flow_file = trigger_results.at(TestConsumeMQTTProcessor::Success).at(0);
+
+ auto string_content = test_controller_.plan->getContent(flow_file);
+ verifyXmlJsonResult(string_content, expected_record_count, add_attributes_as_fields);
+
+ CHECK(*flow_file->getAttribute("record.count") == "2");
+ CHECK(*flow_file->getAttribute("mqtt.broker") == "127.0.0.1:1883");
+}
+
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "Invalid XML payload does not result in new flow files", "[consumeMQTTTest]") {
+ test_controller_.plan->addController("XMLReader", "XMLReader");
+ test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordReader.name, "XMLReader"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
+
+ const std::string payload = "invalid xml payload";
+ TestConsumeMQTTProcessor::SmartMessage message{
+ std::unique_ptr(
+ new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = 1, .payloadlen = gsl::narrow(payload.size()),
+ .payload = const_cast(payload.data()), .qos = 1, .retained = 0, .dup = 0, .msgid = 42, .properties = {}}),
+ std::string{"mytopic"}}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
+ auto& test_processor = dynamic_cast(consume_mqtt_processor_->getImpl());
+ test_processor.enqueueReceivedMQTTMsg(std::move(message));
+
+ const auto trigger_results = test_controller_.trigger();
+ CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).empty());
+ REQUIRE(LogTestController::getInstance().contains("[error] Failed to read records from MQTT message", 1s));
+}
+
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "Read MQTT message and write it to a flow file", "[consumeMQTTTest]") {
+ std::vector expected_topic_segments;
+ std::string topic;
+
+ SECTION("Single topic segment") {
+ expected_topic_segments = {"mytopic"};
+ topic = "mytopic";
+ }
+
+ SECTION("Multiple topic segments") {
+ expected_topic_segments = {"my", "topic", "segment"};
+ topic = "my/topic/segment";
+ }
+
+ SECTION("Empty topic segment") {
+ expected_topic_segments = {"mytopic", "", "segment"};
+ topic = "mytopic//segment";
+ }
+
+ SECTION("Empty topic segment at the end") {
+ expected_topic_segments = {"mytopic", ""};
+ topic = "mytopic/";
+ }
+
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+
+ const size_t expected_flow_file_count = 2;
+ const std::string payload = "test MQTT payload";
+ for (size_t i = 0; i < expected_flow_file_count; ++i) {
+ TestConsumeMQTTProcessor::SmartMessage message{std::unique_ptr(
+ new MQTTAsync_message{.struct_id = {'M', 'Q', 'T', 'M'}, .struct_version = 1, .payloadlen = gsl::narrow(payload.size()),
+ .payload = const_cast(payload.data()), .qos = 1, .retained = 0, .dup = 0, .msgid = 42, .properties = {}}),
+ std::string{topic}}; // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
+ auto& test_processor = dynamic_cast(consume_mqtt_processor_->getImpl());
+ test_processor.enqueueReceivedMQTTMsg(std::move(message));
+ }
+ const auto trigger_results = test_controller_.trigger();
+ CHECK(trigger_results.at(TestConsumeMQTTProcessor::Success).size() == expected_flow_file_count);
+ for (size_t i = 0; i < expected_flow_file_count; ++i) {
+ const auto flow_file = trigger_results.at(TestConsumeMQTTProcessor::Success).at(i);
+ auto string_content = test_controller_.plan->getContent(flow_file);
+ CHECK(string_content == payload);
+
+ CHECK(*flow_file->getAttribute("mqtt.broker") == "127.0.0.1:1883");
+ CHECK(*flow_file->getAttribute("mqtt.topic") == topic);
+ for (size_t j = 0; j < expected_topic_segments.size(); ++j) {
+ CHECK(*flow_file->getAttribute("mqtt.topic.segment." + std::to_string(j)) == expected_topic_segments[j]);
+ }
+ CHECK(*flow_file->getAttribute("mqtt.qos") == "1");
+ CHECK(*flow_file->getAttribute("mqtt.isDuplicate") == "false");
+ CHECK(*flow_file->getAttribute("mqtt.isRetained") == "false");
+ }
+}
+
+TEST_CASE_METHOD(ConsumeMqttTestFixture, "Test scheduling failure if non-existent recordset reader or writer is set", "[consumeMQTTTest]") {
+ test_controller_.plan->addController("XMLReader", "XMLReader");
+ test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::Topic.name, "mytopic"));
+ SECTION("RecordReader is set to invalid controller service") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordReader.name, "invalid_reader"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordWriter.name, "JsonRecordSetWriter"));
+ REQUIRE_THROWS_WITH(test_controller_.trigger(), Catch::Matchers::EndsWith("Controller service 'Record Reader' = 'invalid_reader' not found"));
+ }
+
+ SECTION("RecordWriter is set to invalid controller service") {
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordReader.name, "XMLReader"));
+ REQUIRE(test_controller_.plan->setProperty(consume_mqtt_processor_, minifi::processors::ConsumeMQTT::RecordWriter.name, "invalid_writer"));
+ REQUIRE_THROWS_WITH(test_controller_.trigger(), Catch::Matchers::EndsWith("Controller service 'Record Writer' = 'invalid_writer' not found"));
+ }
+}
+
+} // namespace org::apache::nifi::minifi::test
diff --git a/extensions/mqtt/tests/PublishMQTTTests.cpp b/extensions/mqtt/tests/PublishMQTTTests.cpp
index bcc84f65fb..98ff172806 100644
--- a/extensions/mqtt/tests/PublishMQTTTests.cpp
+++ b/extensions/mqtt/tests/PublishMQTTTests.cpp
@@ -22,62 +22,85 @@
#include "catch2/matchers/catch_matchers_string.hpp"
#include "unit/TestBase.h"
#include "../processors/PublishMQTT.h"
+#include "unit/SingleProcessorTestController.h"
+#include "core/Resource.h"
+#include "controllers/XMLRecordSetWriter.h"
+#include "unit/ProcessorUtils.h"
using namespace std::literals::chrono_literals;
-namespace {
-struct Fixture {
- Fixture() {
- LogTestController::getInstance().setDebug();
- plan_ = testController_.createPlan();
- publishMqttProcessor_ = plan_->addProcessor("PublishMQTT", "publishMqttProcessor");
+namespace org::apache::nifi::minifi::test {
+
+class TestPublishMQTTProcessor : public minifi::processors::PublishMQTT {
+ public:
+ explicit TestPublishMQTTProcessor(minifi::core::ProcessorMetadata metadata)
+ : minifi::processors::PublishMQTT(std::move(metadata)) {}
+
+ void initializeClient() override {
+ }
+
+ bool sendMessage(const std::vector&, const std::string&, const std::string&, const std::shared_ptr&) override {
+ return true;
}
- Fixture(Fixture&&) = delete;
- Fixture(const Fixture&) = delete;
- Fixture& operator=(Fixture&&) = delete;
- Fixture& operator=(const Fixture&) = delete;
+ void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override {
+ minifi::processors::PublishMQTT::onTriggerImpl(context, session);
+ }
+};
- ~Fixture() {
+REGISTER_RESOURCE(TestPublishMQTTProcessor, Processor);
+
+struct PublishMQTTTestFixture {
+ PublishMQTTTestFixture()
+ : test_controller_(utils::make_processor("TestPublishMQTTProcessor")),
+ publish_mqtt_processor_(test_controller_.getProcessor()) {
+ REQUIRE(publish_mqtt_processor_ != nullptr);
+ LogTestController::getInstance().setDebug();
+ }
+
+ PublishMQTTTestFixture(PublishMQTTTestFixture&&) = delete;
+ PublishMQTTTestFixture(const PublishMQTTTestFixture&) = delete;
+ PublishMQTTTestFixture& operator=(PublishMQTTTestFixture&&) = delete;
+ PublishMQTTTestFixture& operator=(const PublishMQTTTestFixture&) = delete;
+
+ ~PublishMQTTTestFixture() {
LogTestController::getInstance().reset();
}
- TestController testController_;
- std::shared_ptr plan_;
- core::Processor* publishMqttProcessor_ = nullptr;
+ SingleProcessorTestController test_controller_;
+ core::Processor* publish_mqtt_processor_ = nullptr;
};
-} // namespace
-TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyTopic", "[publishMQTTTest]") {
- REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_),
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_EmptyTopic", "[publishMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(publish_mqtt_processor_),
Catch::Matchers::EndsWith("Process Schedule Operation: PublishMQTT: Topic is required"));
}
-TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyBrokerURI", "[publishMQTTTest]") {
- REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name, "mytopic"));
- REQUIRE_THROWS_WITH(plan_->scheduleProcessor(publishMqttProcessor_),
- Catch::Matchers::EndsWith("Expected valid value from \"publishMqttProcessor::Broker URI\", but got PropertyNotSet (Property Error:2)"));
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_EmptyBrokerURI", "[publishMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+ REQUIRE_THROWS_WITH(test_controller_.plan->scheduleProcessor(publish_mqtt_processor_),
+ Catch::Matchers::EndsWith("Expected valid value from \"TestPublishMQTTProcessor::Broker URI\", but got PropertyNotSet (Property Error:2)"));
}
-TEST_CASE_METHOD(Fixture, "PublishMQTTTest_EmptyClientID_V_3", "[publishMQTTTest]") {
- REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name, "mytopic"));
- REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::MessageExpiryInterval.name, "60 sec"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(publishMqttProcessor_));
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_EmptyClientID_V_3", "[publishMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::MessageExpiryInterval.name, "60 sec"));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(publish_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Message Expiry Intervals. Property is not used.", 1s));
}
-TEST_CASE_METHOD(Fixture, "PublishMQTTTest_ContentType_V_3", "[publishMQTTTest]") {
- REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::Topic.name, "mytopic"));
- REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
- REQUIRE(publishMqttProcessor_->setProperty(minifi::processors::PublishMQTT::ContentType.name, "text/plain"));
- REQUIRE_NOTHROW(plan_->scheduleProcessor(publishMqttProcessor_));
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTTTest_ContentType_V_3", "[publishMQTTTest]") {
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::ContentType.name, "text/plain"));
+ REQUIRE_NOTHROW(test_controller_.plan->scheduleProcessor(publish_mqtt_processor_));
REQUIRE(LogTestController::getInstance().contains("[warning] MQTT 3.x specification does not support Content Types. Property is not used.", 1s));
}
-TEST_CASE_METHOD(Fixture, "PublishMQTT can publish the number of in-flight messages as a metric") {
- const auto node = publishMqttProcessor_->getResponseNode();
+TEST_CASE_METHOD(PublishMQTTTestFixture, "PublishMQTT can publish the number of in-flight messages as a metric") {
+ const auto node = publish_mqtt_processor_->getResponseNode();
SECTION("heartbeat metric") {
const auto serialized_nodes = minifi::state::response::ResponseNode::serializeAndMergeResponseNodes({node});
@@ -94,3 +117,46 @@ TEST_CASE_METHOD(Fixture, "PublishMQTT can publish the number of in-flight messa
CHECK(it->value == 0.0);
}
}
+
+TEST_CASE_METHOD(PublishMQTTTestFixture, "Test sending XML message records", "[publishMQTTTest]") {
+ test_controller_.plan->addController("JsonTreeReader", "JsonTreeReader");
+ auto xml_writer = test_controller_.plan->addController("XMLRecordSetWriter", "XMLRecordSetWriter");
+ REQUIRE(test_controller_.plan->setProperty(xml_writer, minifi::standard::XMLRecordSetWriter::NameOfRootTag.name, "root"));
+ REQUIRE(test_controller_.plan->setProperty(xml_writer, minifi::standard::XMLRecordSetWriter::NameOfRecordTag.name, "record"));
+
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::RecordReader.name, "JsonTreeReader"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::RecordWriter.name, "XMLRecordSetWriter"));
+
+ const auto trigger_results = test_controller_.trigger(R"([{"element1": "value1"}, {"element2": "42"}])");
+ CHECK(trigger_results.at(TestPublishMQTTProcessor::Success).size() == 2);
+ const auto flow_file_1 = trigger_results.at(TestPublishMQTTProcessor::Success).at(0);
+
+ auto string_content = test_controller_.plan->getContent(flow_file_1);
+ CHECK(string_content == R"(value1)");
+
+ const auto flow_file_2 = trigger_results.at(TestPublishMQTTProcessor::Success).at(1);
+ string_content = test_controller_.plan->getContent(flow_file_2);
+ CHECK(string_content == R"(42)");
+}
+
+TEST_CASE_METHOD(PublishMQTTTestFixture, "Test scheduling failure if non-existent recordset reader or writer is set", "[publishMQTTTest]") {
+ test_controller_.plan->addController("XMLReader", "XMLReader");
+ test_controller_.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter");
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::AbstractMQTTProcessor::BrokerURI.name, "127.0.0.1:1883"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::Topic.name, "mytopic"));
+ SECTION("RecordReader is set to invalid controller service") {
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::RecordReader.name, "invalid_reader"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::RecordWriter.name, "JsonRecordSetWriter"));
+ REQUIRE_THROWS_WITH(test_controller_.trigger(), Catch::Matchers::EndsWith("Controller service 'Record Reader' = 'invalid_reader' not found"));
+ }
+
+ SECTION("RecordWriter is set to invalid controller service") {
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::RecordReader.name, "XMLReader"));
+ REQUIRE(test_controller_.plan->setProperty(publish_mqtt_processor_, minifi::processors::PublishMQTT::RecordWriter.name, "invalid_writer"));
+ REQUIRE_THROWS_WITH(test_controller_.trigger(), Catch::Matchers::EndsWith("Controller service 'Record Writer' = 'invalid_writer' not found"));
+ }
+}
+
+} // namespace org::apache::nifi::minifi::test
diff --git a/extensions/standard-processors/processors/ConvertRecord.cpp b/extensions/standard-processors/processors/ConvertRecord.cpp
index fb85990c1d..cf37cc1919 100644
--- a/extensions/standard-processors/processors/ConvertRecord.cpp
+++ b/extensions/standard-processors/processors/ConvertRecord.cpp
@@ -20,17 +20,20 @@
#include "nonstd/expected.hpp"
#include "utils/GeneralUtils.h"
#include "utils/ProcessorConfigUtils.h"
+#include "minifi-cpp/utils/gsl.h"
namespace org::apache::nifi::minifi::processors {
void ConvertRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
- record_set_reader_ = utils::parseControllerService(context, RecordReader, getUUID());
- record_set_writer_ = utils::parseControllerService(context, RecordWriter, getUUID());
+ record_converter_ = core::RecordConverter{
+ .record_set_reader = utils::parseControllerService(context, RecordReader, getUUID()),
+ .record_set_writer = utils::parseControllerService(context, RecordWriter, getUUID())
+ };
include_zero_record_flow_files_ = utils::parseBoolProperty(context, IncludeZeroRecordFlowFiles);
}
void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
- gsl_Expects(record_set_reader_ && record_set_writer_);
+ gsl_Expects(record_converter_);
const auto flow_file = session.get();
if (!flow_file) {
context.yield();
@@ -39,7 +42,7 @@ void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSessio
nonstd::expected record_set;
session.read(flow_file, [this, &record_set](const std::shared_ptr& input_stream) {
- record_set = record_set_reader_->read(*input_stream);
+ record_set = record_converter_->record_set_reader->read(*input_stream);
return gsl::narrow(input_stream->size());
});
if (!record_set) {
@@ -55,7 +58,7 @@ void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSessio
return;
}
- record_set_writer_->write(*record_set, flow_file, session);
+ record_converter_->record_set_writer->write(*record_set, flow_file, session);
flow_file->setAttribute(processors::ConvertRecord::RecordCountOutputAttribute.name, std::to_string(record_set->size()));
session.transfer(flow_file, Success);
}
diff --git a/extensions/standard-processors/processors/ConvertRecord.h b/extensions/standard-processors/processors/ConvertRecord.h
index daf1ffe27e..5827b9f62f 100644
--- a/extensions/standard-processors/processors/ConvertRecord.h
+++ b/extensions/standard-processors/processors/ConvertRecord.h
@@ -19,13 +19,13 @@
#include
#include
#include
+#include
#include "core/AbstractProcessor.h"
#include "core/ProcessSession.h"
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/RelationshipDefinition.h"
-#include "controllers/RecordSetReader.h"
-#include "controllers/RecordSetWriter.h"
+#include "minifi-cpp/controllers/RecordConverter.h"
namespace org::apache::nifi::minifi::processors {
@@ -77,8 +77,7 @@ class ConvertRecord : public core::AbstractProcessor {
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
private:
- std::shared_ptr record_set_reader_;
- std::shared_ptr record_set_writer_;
+ std::optional record_converter_;
bool include_zero_record_flow_files_ = true;
};
diff --git a/extensions/standard-processors/processors/SplitRecord.cpp b/extensions/standard-processors/processors/SplitRecord.cpp
index 5cc4f31539..fb32a5baa2 100644
--- a/extensions/standard-processors/processors/SplitRecord.cpp
+++ b/extensions/standard-processors/processors/SplitRecord.cpp
@@ -19,12 +19,16 @@
#include "core/Resource.h"
#include "nonstd/expected.hpp"
#include "utils/GeneralUtils.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "minifi-cpp/utils/gsl.h"
namespace org::apache::nifi::minifi::processors {
void SplitRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
- record_set_reader_ = utils::parseControllerService(context, RecordReader, getUUID());
- record_set_writer_ = utils::parseControllerService(context, RecordWriter, getUUID());
+ record_converter_ = core::RecordConverter{
+ .record_set_reader = utils::parseControllerService(context, RecordReader, getUUID()),
+ .record_set_writer = utils::parseControllerService(context, RecordWriter, getUUID())
+ };
}
nonstd::expected SplitRecord::readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& original_flow_file) {
@@ -36,6 +40,7 @@ nonstd::expected SplitRecord::readRecordsPerSplit(core
}
void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+ gsl_Expects(record_converter_);
const auto original_flow_file = session.get();
if (!original_flow_file) {
context.yield();
@@ -51,7 +56,7 @@ void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession&
nonstd::expected record_set;
session.read(original_flow_file, [this, &record_set](const std::shared_ptr& input_stream) {
- record_set = record_set_reader_->read(*input_stream);
+ record_set = record_converter_->record_set_reader->read(*input_stream);
return gsl::narrow(input_stream->size());
});
if (!record_set) {
@@ -84,7 +89,7 @@ void SplitRecord::onTrigger(core::ProcessContext& context, core::ProcessSession&
split_flow_file->setAttribute("fragment.count", std::to_string(fragment_count));
split_flow_file->setAttribute("segment.original.filename", original_flow_file->getAttribute("filename").value_or(""));
- record_set_writer_->write(slice_record_set, split_flow_file, session);
+ record_converter_->record_set_writer->write(slice_record_set, split_flow_file, session);
session.transfer(split_flow_file, Splits);
++fragment_index;
}
diff --git a/extensions/standard-processors/processors/SplitRecord.h b/extensions/standard-processors/processors/SplitRecord.h
index 5057d7e1c2..c594024d3f 100644
--- a/extensions/standard-processors/processors/SplitRecord.h
+++ b/extensions/standard-processors/processors/SplitRecord.h
@@ -16,6 +16,8 @@
*/
#pragma once
+#include
+
#include "minifi-cpp/core/Annotation.h"
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
@@ -24,8 +26,7 @@
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/RelationshipDefinition.h"
#include "minifi-cpp/core/logging/Logger.h"
-#include "minifi-cpp/controllers/RecordSetReader.h"
-#include "minifi-cpp/controllers/RecordSetWriter.h"
+#include "minifi-cpp/controllers/RecordConverter.h"
#include "core/AbstractProcessor.h"
namespace org::apache::nifi::minifi::processors {
@@ -87,8 +88,7 @@ class SplitRecord final : public core::AbstractProcessor {
private:
static nonstd::expected readRecordsPerSplit(core::ProcessContext& context, const core::FlowFile& original_flow_file);
- std::shared_ptr record_set_reader_;
- std::shared_ptr record_set_writer_;
+ std::optional record_converter_;
};
} // namespace org::apache::nifi::minifi::processors
diff --git a/minifi-api/include/minifi-cpp/controllers/RecordConverter.h b/minifi-api/include/minifi-cpp/controllers/RecordConverter.h
new file mode 100644
index 0000000000..295dd2013d
--- /dev/null
+++ b/minifi-api/include/minifi-cpp/controllers/RecordConverter.h
@@ -0,0 +1,32 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include
+
+#include "RecordSetReader.h"
+#include "RecordSetWriter.h"
+#include "minifi-cpp/utils/gsl.h"
+
+namespace org::apache::nifi::minifi::core {
+
+struct RecordConverter {
+ gsl::not_null> record_set_reader;
+ gsl::not_null> record_set_writer;
+};
+
+} // namespace org::apache::nifi::minifi::core