Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ In the list below, the names of required properties appear in bold. Any other pr

### Description

This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. The the payload of the MQTT message becomes content of a FlowFile
This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. The the payload of the MQTT message becomes content of a FlowFile. If Record Reader and Record Writer are set, then the MQTT message specific attributes are not set in the flow file, because different attributes can be set for different records. In this case if Add Attributes As Fields is set to true, the attributes will be added to each record as fields.

### Properties

Expand All @@ -411,6 +411,9 @@ In the list below, the names of required properties appear in bold. Any other pr
| Attribute From Content Type | | | Name of FlowFile attribute to be filled from content type of received message. MQTT 5.x only. |
| Topic Alias Maximum | 0 | | Maximum number of topic aliases to use. If set to 0, then topic aliases cannot be used. MQTT 5.x only. |
| Receive Maximum | 65535 | | Maximum number of unacknowledged messages allowed. MQTT 5.x only. |
| Record Reader | | | The Record Reader to use for parsing received MQTT Messages into Records. |
| Record Writer | | | The Record Writer to use for serializing Records before writing them to a FlowFile. |
| Add Attributes As Fields | true | true<br/>false | If setting this property to true, default fields are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained. |
| **Quality of Service** | 0 | 0<br/>1<br/>2 | The Quality of Service (QoS) of messages. |
| Connection Timeout | 10 sec | | Maximum time interval the client will wait for the network connection to the MQTT broker |
| Keep Alive Interval | 60 sec | | Defines the maximum time interval between messages sent or received |
Expand All @@ -435,10 +438,15 @@ In the list below, the names of required properties appear in bold. Any other pr

### Output Attributes

| Attribute | Relationship | Description |
|-------------|--------------|---------------------------|
| mqtt.broker | | URI of the sending broker |
| mqtt.topic | | Topic of the message |
| Attribute | Relationship | Description |
|----------------------|--------------|---------------------------------------------------------------------------------------------------------------------------------------|
| mqtt.broker | | URI of the sending broker |
| mqtt.topic | | Topic of the message |
| mqtt.topic.segment.n | | The nth topic segment of the message |
| mqtt.qos | | The quality of service for this message. |
| mqtt.isDuplicate | | Whether or not this message might be a duplicate of one which has already been received. |
| mqtt.isRetained | | Whether or not this message was from a current publisher, or was "retained" by the server as the last message published on the topic. |
| record.count | | The number of records received |


## ConsumeWindowsEventLog
Expand Down Expand Up @@ -2198,6 +2206,8 @@ In the list below, the names of required properties appear in bold. Any other pr
| Retain | false | true<br/>false | Retain published message in broker |
| Message Expiry Interval | | | Time while message is valid and will be forwarded by broker. MQTT 5.x only. |
| Content Type | | | Content type of the message. MQTT 5.x only.<br/>**Supports Expression Language: true** |
| Record Reader | | | The Record Reader to use for parsing the incoming FlowFile into Records. |
| Record Writer | | | The Record Writer to use for serializing Records before publishing them as an MQTT Message. |
| **Quality of Service** | 0 | 0<br/>1<br/>2 | The Quality of Service (QoS) of messages. |
| Connection Timeout | 10 sec | | Maximum time interval the client will wait for the network connection to the MQTT broker |
| Keep Alive Interval | 60 sec | | Defines the maximum time interval between messages sent or received |
Expand Down
1 change: 1 addition & 0 deletions docker/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ prometheus-api-client==0.5.5
humanfriendly==10.0
requests<2.29 # https://github.com/docker/docker-py/issues/3113
couchbase==4.3.5
paho-mqtt==2.1.0
5 changes: 5 additions & 0 deletions docker/test/integration/cluster/DockerTestCluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from .checkers.GrafanaLokiChecker import GrafanaLokiChecker
from .checkers.ModbusChecker import ModbusChecker
from .checkers.CouchbaseChecker import CouchbaseChecker
from .checkers.MqttHelper import MqttHelper
from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, retry_check


Expand All @@ -58,6 +59,7 @@ def __init__(self, context, feature_id):
self.modbus_checker = ModbusChecker(self.container_communicator)
self.couchbase_checker = CouchbaseChecker()
self.kafka_checker = KafkaHelper(self.container_communicator, feature_id)
self.mqtt_helper = MqttHelper()

def cleanup(self):
self.container_store.cleanup()
Expand Down Expand Up @@ -457,3 +459,6 @@ def enable_ssl_in_nifi(self):

def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
return self.couchbase_checker.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)

def publish_test_mqtt_message(self, topic: str, message: str):
self.mqtt_helper.publish_test_mqtt_message(topic, message)
23 changes: 23 additions & 0 deletions docker/test/integration/cluster/checkers/MqttHelper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paho.mqtt.client as mqtt


class MqttHelper:
def publish_test_mqtt_message(self, topic: str, message: str):
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "docker_test_client_id")
client.connect("localhost", 1883, 60)
client.publish(topic, message)
client.disconnect()
6 changes: 6 additions & 0 deletions docker/test/integration/cluster/containers/FlowContainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ def add_start_node(self, node):
def add_controller(self, controller):
self.controllers.append(controller)

def get_controller(self, name):
for controller in self.controllers:
if controller.name == name:
return controller
raise ValueError(f"Controller with name '{name}' not found")

def add_parameter_to_flow_config(self, parameter_context_name, parameter_name, parameter_value):
if parameter_context_name in self.parameter_contexts:
self.parameter_contexts[parameter_context_name].append(Parameter(parameter_name, parameter_value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def deploy(self):
self.image_store.get_image(self.get_engine()),
detach=True,
name=self.name,
ports={'1883/tcp': 1883},
network=self.network.name,
entrypoint=self.command)
logging.info('Added container \'%s\'', self.name)
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,6 @@ def enable_ssl_in_nifi(self):

def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)

def publish_test_mqtt_message(self, topic, message):
self.cluster.publish_test_mqtt_message(topic, message)
53 changes: 53 additions & 0 deletions docker/test/integration/features/mqtt.feature
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor
And "ConsumeMQTT" processor is a start node
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute

And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT

Expand All @@ -101,6 +103,12 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And a file with the content "test" is placed in "/tmp/input"
And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 60 seconds
And the Minifi logs contain the following message: "key:mqtt.topic value:testtopic" in less than 1 seconds
And the Minifi logs contain the following message: "key:mqtt.topic.segment.0 value:testtopic" in less than 1 seconds
And the Minifi logs contain the following message: "key:mqtt.qos value:0" in less than 1 seconds
And the Minifi logs contain the following message: "key:mqtt.isDuplicate value:false" in less than 1 seconds
And the Minifi logs contain the following message: "key:mqtt.isRetained value:false" in less than 1 seconds

Examples: MQTT versions
| version |
Expand Down Expand Up @@ -505,3 +513,48 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And "publisher-client" flow is killed
And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client"
And a flowfile with the content "last_will_message" is placed in the monitored directory in less than 60 seconds

Scenario: A MiNiFi instance uses record reader and writer to convert consumed message from an MQTT broker
Given a XMLReader controller service is set up
And a JsonRecordSetWriter controller service is set up with "Array" output grouping
And a ConsumeMQTT processor with the "Topic" property set to "test/my/topic"
And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.x AUTO"
And the "Record Reader" property of the ConsumeMQTT processor is set to "XMLReader"
And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute
And an MQTT broker is set up in correspondence with the ConsumeMQTT

When both instances start up
And a test message "<root><element>test</element></root>" is published to the MQTT broker on topic "test/my/topic"

Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
And a flowfile with the JSON content '[{"_isRetained": false, "_isDuplicate": false, "_qos": 0, "_topicSegments": ["test", "my", "topic"], "_topic": "test/my/topic", "element": "test"}]' is placed in the monitored directory in less than 60 seconds
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds

Scenario: A MiNiFi instance uses record reader and writer to convert and publish records to an MQTT broker
Given a JsonTreeReader controller service is set up
And a XMLRecordSetWriter controller service is set up
And the "Name of Record Tag" property of the XMLRecordSetWriter controller is set to "record"
And the "Name of Root Tag" property of the XMLRecordSetWriter controller is set to "root"
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '[{"string": "test"}, {"int": 42}]' is present in '/tmp/input'
And a PublishMQTT processor set up to communicate with an MQTT broker instance
And the "MQTT Version" property of the PublishMQTT processor is set to "3.x AUTO"
And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader"
And the "Record Writer" property of the PublishMQTT processor is set to "XMLRecordSetWriter"
And a UpdateAttribute processor with the "filename" property set to "${UUID()}.xml"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
And the "success" relationship of the PublishMQTT processor is connected to the UpdateAttribute
And the "success" relationship of the UpdateAttribute processor is connected to the PutFile
And an MQTT broker is set up in correspondence with the PublishMQTT

When both instances start up

Then two flowfiles with the contents '<?xml version="1.0"?><root><record><string>test</string></record></root>' and '<?xml version="1.0"?><root><record><int>42</int></record></root>' are placed in the monitored directory in less than 60 seconds
And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(72 bytes\)"
And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(64 bytes\)"
Loading
Loading