Skip to content

Commit a1ee3af

Browse files
lordgamezfgerlits
authored andcommitted
MINIFICPP-2603 Add Record Reader and Record Writer properties to MQTT processors
Signed-off-by: Ferenc Gerlits <fgerlits@gmail.com> Closes #2004
1 parent 1976d80 commit a1ee3af

File tree

24 files changed

+804
-183
lines changed

24 files changed

+804
-183
lines changed

PROCESSORS.md

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ In the list below, the names of required properties appear in bold. Any other pr
392392

393393
### Description
394394

395-
This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. The the payload of the MQTT message becomes content of a FlowFile
395+
This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. The the payload of the MQTT message becomes content of a FlowFile. If Record Reader and Record Writer are set, then the MQTT message specific attributes are not set in the flow file, because different attributes can be set for different records. In this case if Add Attributes As Fields is set to true, the attributes will be added to each record as fields.
396396

397397
### Properties
398398

@@ -411,6 +411,9 @@ In the list below, the names of required properties appear in bold. Any other pr
411411
| Attribute From Content Type | | | Name of FlowFile attribute to be filled from content type of received message. MQTT 5.x only. |
412412
| Topic Alias Maximum | 0 | | Maximum number of topic aliases to use. If set to 0, then topic aliases cannot be used. MQTT 5.x only. |
413413
| Receive Maximum | 65535 | | Maximum number of unacknowledged messages allowed. MQTT 5.x only. |
414+
| Record Reader | | | The Record Reader to use for parsing received MQTT Messages into Records. |
415+
| Record Writer | | | The Record Writer to use for serializing Records before writing them to a FlowFile. |
416+
| Add Attributes As Fields | true | true<br/>false | If setting this property to true, default fields are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained. |
414417
| **Quality of Service** | 0 | 0<br/>1<br/>2 | The Quality of Service (QoS) of messages. |
415418
| Connection Timeout | 10 sec | | Maximum time interval the client will wait for the network connection to the MQTT broker |
416419
| Keep Alive Interval | 60 sec | | Defines the maximum time interval between messages sent or received |
@@ -435,10 +438,15 @@ In the list below, the names of required properties appear in bold. Any other pr
435438

436439
### Output Attributes
437440

438-
| Attribute | Relationship | Description |
439-
|-------------|--------------|---------------------------|
440-
| mqtt.broker | | URI of the sending broker |
441-
| mqtt.topic | | Topic of the message |
441+
| Attribute | Relationship | Description |
442+
|----------------------|--------------|---------------------------------------------------------------------------------------------------------------------------------------|
443+
| mqtt.broker | | URI of the sending broker |
444+
| mqtt.topic | | Topic of the message |
445+
| mqtt.topic.segment.n | | The nth topic segment of the message |
446+
| mqtt.qos | | The quality of service for this message. |
447+
| mqtt.isDuplicate | | Whether or not this message might be a duplicate of one which has already been received. |
448+
| mqtt.isRetained | | Whether or not this message was from a current publisher, or was "retained" by the server as the last message published on the topic. |
449+
| record.count | | The number of records received |
442450

443451

444452
## ConsumeWindowsEventLog
@@ -2198,6 +2206,8 @@ In the list below, the names of required properties appear in bold. Any other pr
21982206
| Retain | false | true<br/>false | Retain published message in broker |
21992207
| Message Expiry Interval | | | Time while message is valid and will be forwarded by broker. MQTT 5.x only. |
22002208
| Content Type | | | Content type of the message. MQTT 5.x only.<br/>**Supports Expression Language: true** |
2209+
| Record Reader | | | The Record Reader to use for parsing the incoming FlowFile into Records. |
2210+
| Record Writer | | | The Record Writer to use for serializing Records before publishing them as an MQTT Message. |
22012211
| **Quality of Service** | 0 | 0<br/>1<br/>2 | The Quality of Service (QoS) of messages. |
22022212
| Connection Timeout | 10 sec | | Maximum time interval the client will wait for the network connection to the MQTT broker |
22032213
| Keep Alive Interval | 60 sec | | Defines the maximum time interval between messages sent or received |

docker/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ prometheus-api-client==0.5.5
1111
humanfriendly==10.0
1212
requests<2.29 # https://github.com/docker/docker-py/issues/3113
1313
couchbase==4.3.5
14+
paho-mqtt==2.1.0

docker/test/integration/cluster/DockerTestCluster.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from .checkers.GrafanaLokiChecker import GrafanaLokiChecker
3838
from .checkers.ModbusChecker import ModbusChecker
3939
from .checkers.CouchbaseChecker import CouchbaseChecker
40+
from .checkers.MqttHelper import MqttHelper
4041
from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, retry_check
4142

4243

@@ -58,6 +59,7 @@ def __init__(self, context, feature_id):
5859
self.modbus_checker = ModbusChecker(self.container_communicator)
5960
self.couchbase_checker = CouchbaseChecker()
6061
self.kafka_checker = KafkaHelper(self.container_communicator, feature_id)
62+
self.mqtt_helper = MqttHelper()
6163

6264
def cleanup(self):
6365
self.container_store.cleanup()
@@ -457,3 +459,6 @@ def enable_ssl_in_nifi(self):
457459

458460
def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
459461
return self.couchbase_checker.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)
462+
463+
def publish_test_mqtt_message(self, topic: str, message: str):
464+
self.mqtt_helper.publish_test_mqtt_message(topic, message)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
import paho.mqtt.client as mqtt
16+
17+
18+
class MqttHelper:
19+
def publish_test_mqtt_message(self, topic: str, message: str):
20+
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "docker_test_client_id")
21+
client.connect("localhost", 1883, 60)
22+
client.publish(topic, message)
23+
client.disconnect()

docker/test/integration/cluster/containers/FlowContainer.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ def add_start_node(self, node):
4747
def add_controller(self, controller):
4848
self.controllers.append(controller)
4949

50+
def get_controller(self, name):
51+
for controller in self.controllers:
52+
if controller.name == name:
53+
return controller
54+
raise ValueError(f"Controller with name '{name}' not found")
55+
5056
def add_parameter_to_flow_config(self, parameter_context_name, parameter_name, parameter_value):
5157
if parameter_context_name in self.parameter_contexts:
5258
self.parameter_contexts[parameter_context_name].append(Parameter(parameter_name, parameter_value))

docker/test/integration/cluster/containers/MqttBrokerContainer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def deploy(self):
3434
self.image_store.get_image(self.get_engine()),
3535
detach=True,
3636
name=self.name,
37+
ports={'1883/tcp': 1883},
3738
network=self.network.name,
3839
entrypoint=self.command)
3940
logging.info('Added container \'%s\'', self.name)

docker/test/integration/features/MiNiFi_integration_test_driver.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,3 +517,6 @@ def enable_ssl_in_nifi(self):
517517

518518
def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
519519
assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)
520+
521+
def publish_test_mqtt_message(self, topic, message):
522+
self.cluster.publish_test_mqtt_message(topic, message)

docker/test/integration/features/mqtt.feature

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
9090
And a ConsumeMQTT processor set up to communicate with an MQTT broker instance
9191
And the "MQTT Version" property of the ConsumeMQTT processor is set to "<version>"
9292
And a PutFile processor with the "Directory" property set to "/tmp/output"
93+
And a LogAttribute processor
9394
And "ConsumeMQTT" processor is a start node
9495
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
96+
And the "success" relationship of the PutFile processor is connected to the LogAttribute
9597

9698
And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
9799

@@ -101,6 +103,12 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
101103
And a file with the content "test" is placed in "/tmp/input"
102104
And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
103105
And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)"
106+
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 60 seconds
107+
And the Minifi logs contain the following message: "key:mqtt.topic value:testtopic" in less than 1 seconds
108+
And the Minifi logs contain the following message: "key:mqtt.topic.segment.0 value:testtopic" in less than 1 seconds
109+
And the Minifi logs contain the following message: "key:mqtt.qos value:0" in less than 1 seconds
110+
And the Minifi logs contain the following message: "key:mqtt.isDuplicate value:false" in less than 1 seconds
111+
And the Minifi logs contain the following message: "key:mqtt.isRetained value:false" in less than 1 seconds
104112

105113
Examples: MQTT versions
106114
| version |
@@ -505,3 +513,48 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
505513
And "publisher-client" flow is killed
506514
And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client"
507515
And a flowfile with the content "last_will_message" is placed in the monitored directory in less than 60 seconds
516+
517+
Scenario: A MiNiFi instance uses record reader and writer to convert consumed message from an MQTT broker
518+
Given a XMLReader controller service is set up
519+
And a JsonRecordSetWriter controller service is set up with "Array" output grouping
520+
And a ConsumeMQTT processor with the "Topic" property set to "test/my/topic"
521+
And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.x AUTO"
522+
And the "Record Reader" property of the ConsumeMQTT processor is set to "XMLReader"
523+
And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter"
524+
And a PutFile processor with the "Directory" property set to "/tmp/output"
525+
And a LogAttribute processor
526+
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
527+
And the "success" relationship of the PutFile processor is connected to the LogAttribute
528+
And an MQTT broker is set up in correspondence with the ConsumeMQTT
529+
530+
When both instances start up
531+
And a test message "<root><element>test</element></root>" is published to the MQTT broker on topic "test/my/topic"
532+
533+
Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
534+
And a flowfile with the JSON content '[{"_isRetained": false, "_isDuplicate": false, "_qos": 0, "_topicSegments": ["test", "my", "topic"], "_topic": "test/my/topic", "element": "test"}]' is placed in the monitored directory in less than 60 seconds
535+
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
536+
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds
537+
538+
Scenario: A MiNiFi instance uses record reader and writer to convert and publish records to an MQTT broker
539+
Given a JsonTreeReader controller service is set up
540+
And a XMLRecordSetWriter controller service is set up
541+
And the "Name of Record Tag" property of the XMLRecordSetWriter controller is set to "record"
542+
And the "Name of Root Tag" property of the XMLRecordSetWriter controller is set to "root"
543+
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
544+
And a file with the content '[{"string": "test"}, {"int": 42}]' is present in '/tmp/input'
545+
And a PublishMQTT processor set up to communicate with an MQTT broker instance
546+
And the "MQTT Version" property of the PublishMQTT processor is set to "3.x AUTO"
547+
And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader"
548+
And the "Record Writer" property of the PublishMQTT processor is set to "XMLRecordSetWriter"
549+
And a UpdateAttribute processor with the "filename" property set to "${UUID()}.xml"
550+
And a PutFile processor with the "Directory" property set to "/tmp/output"
551+
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
552+
And the "success" relationship of the PublishMQTT processor is connected to the UpdateAttribute
553+
And the "success" relationship of the UpdateAttribute processor is connected to the PutFile
554+
And an MQTT broker is set up in correspondence with the PublishMQTT
555+
556+
When both instances start up
557+
558+
Then two flowfiles with the contents '<?xml version="1.0"?><root><record><string>test</string></record></root>' and '<?xml version="1.0"?><root><record><int>42</int></record></root>' are placed in the monitored directory in less than 60 seconds
559+
And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(72 bytes\)"
560+
And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(64 bytes\)"

0 commit comments

Comments
 (0)