Skip to content

Commit 1284791

Browse files
committed
MINIFICPP-2590 Add Sparkplug B support for PublishMQTT processor
1 parent 153ef73 commit 1284791

File tree

10 files changed

+796
-5
lines changed

10 files changed

+796
-5
lines changed

CONTROLLERS.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ limitations under the License.
3030
- [RocksDbStateStorage](#RocksDbStateStorage)
3131
- [SmbConnectionControllerService](#SmbConnectionControllerService)
3232
- [SparkplugBReader](#SparkplugBReader)
33+
- [SparkplugBWriter](#SparkplugBWriter)
3334
- [SSLContextService](#SSLContextService)
3435
- [UpdatePolicyControllerService](#UpdatePolicyControllerService)
3536
- [VolatileMapStateStorage](#VolatileMapStateStorage)
@@ -294,6 +295,20 @@ In the list below, the names of required properties appear in bold. Any other pr
294295
|------|---------------|------------------|-------------|
295296

296297

298+
## SparkplugBWriter
299+
300+
### Description
301+
302+
Serializes recordset to Sparkplug B messages and writes them into a FlowFile. This writer is typically used with MQTT processors like PublishMQTT.
303+
304+
### Properties
305+
306+
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
307+
308+
| Name | Default Value | Allowable Values | Description |
309+
|------|---------------|------------------|-------------|
310+
311+
297312
## SSLContextService
298313

299314
### Description

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte
8585
| Kubernetes (Linux) | [KubernetesControllerService](CONTROLLERS.md#kubernetescontrollerservice) | -DENABLE_KUBERNETES=ON |
8686
| LlamaCpp | [RunLlamaCppInference](PROCESSORS.md#runllamacppinference) | -DENABLE_LLAMACPP=ON |
8787
| Lua Scripting | [ExecuteScript](PROCESSORS.md#executescript) | -DENABLE_LUA_SCRIPTING=ON |
88-
| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt)<br/>[SparkplugBReader](PROCESSORS.md#sparkplugbreader) | -DENABLE_MQTT=ON |
88+
| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt)<br/>[SparkplugBReader](PROCESSORS.md#sparkplugbreader)<br/>[SparkplugBWriter](PROCESSORS.md#sparkplugbwriter) | -DENABLE_MQTT=ON |
8989
| OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)<br/>[PutOPCProcessor](PROCESSORS.md#putopcprocessor) | -DENABLE_OPC=ON |
9090
| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)<br/>[MotionDetector](PROCESSORS.md#motiondetector) | -DENABLE_OPENCV=ON |
9191
| PDH (Windows) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) | -DENABLE_PDH=ON |

docker/test/integration/features/mqtt.feature

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,3 +579,39 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
579579
And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds
580580
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
581581
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds
582+
583+
Scenario: A MiNiFi instance publishes then consumes Sparkplug message through MQTT broker
584+
Given an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
585+
586+
And a SparkplugBReader controller service is set up in the "consumer-client" flow
587+
And a JsonRecordSetWriter controller service is set up with "Array" output grouping in the "consumer-client" flow
588+
And a SparkplugBWriter controller service is set up in the "publisher-client" flow
589+
And a JsonTreeReader controller service is set up in the "publisher-client" flow
590+
591+
And a file with the content '{"timestamp":987654321,"metrics":[{"int_value":123,"timestamp":45345346346,"name":"TestMetric"}],"seq":12345,"body":"test-body","uuid":"test-uuid"}' is present in '/tmp/input'
592+
593+
And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
594+
And a PublishMQTT processor in the "publisher-client" flow
595+
And the "Topic" property of the PublishMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
596+
And the "MQTT Version" property of the PublishMQTT processor is set to "3.1.1"
597+
And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader"
598+
And the "Record Writer" property of the PublishMQTT processor is set to "SparkplugBWriter"
599+
600+
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
601+
602+
And a ConsumeMQTT processor in the "consumer-client" flow
603+
And the "Topic" property of the ConsumeMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
604+
And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.1.1"
605+
And the "Record Reader" property of the ConsumeMQTT processor is set to "SparkplugBReader"
606+
And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter"
607+
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
608+
And a LogAttribute processor with the "Log Payload" property set to "true" in the "consumer-client" flow
609+
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
610+
And the "success" relationship of the PutFile processor is connected to the LogAttribute
611+
612+
When all instances start up
613+
614+
Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
615+
And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds
616+
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
617+
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds

docker/test/integration/features/steps/steps.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from minifi.controllers.XMLRecordSetWriter import XMLRecordSetWriter
3030
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
3131
from minifi.controllers.SparkplugBReader import SparkplugBReader
32+
from minifi.controllers.SparkplugBWriter import SparkplugBWriter
3233

3334
from behave import given, then, when
3435
from behave.model_describe import ModelDescriptor
@@ -468,13 +469,30 @@ def step_impl(context):
468469
context.test.start('mqtt-broker')
469470

470471

471-
@given("a SparkplugBReader controller service is set up")
472-
def step_impl(context):
472+
@given("a SparkplugBReader controller service is set up in the \"{minifi_container_name}\" flow")
473+
def step_impl(context, minifi_container_name: str):
473474
sparkplug_record_set_reader = SparkplugBReader("SparkplugBReader")
474-
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
475+
container = context.test.acquire_container(context=context, name=minifi_container_name)
475476
container.add_controller(sparkplug_record_set_reader)
476477

477478

479+
@given("a SparkplugBWriter controller service is set up in the \"{minifi_container_name}\" flow")
480+
def step_impl(context, minifi_container_name: str):
481+
sparkplug_record_set_writer = SparkplugBWriter("SparkplugBWriter")
482+
container = context.test.acquire_container(context=context, name=minifi_container_name)
483+
container.add_controller(sparkplug_record_set_writer)
484+
485+
486+
@given("a SparkplugBReader controller service is set up")
487+
def step_impl(context):
488+
context.execute_steps("given a SparkplugBReader controller service is set up in the \"minifi-cpp-flow\" flow")
489+
490+
491+
@given("a SparkplugBWriter controller service is set up")
492+
def step_impl(context):
493+
context.execute_steps("given a SparkplugBWriter controller service is set up in the \"minifi-cpp-flow\" flow")
494+
495+
478496
@when("a test Sparkplug payload is published to the topic \"{topic}\"")
479497
def step_impl(context, topic):
480498
context.test.publish_test_sparkplug_payload(topic)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
from ..core.ControllerService import ControllerService
18+
19+
20+
class SparkplugBWriter(ControllerService):
21+
def __init__(self, name=None):
22+
super(SparkplugBWriter, self).__init__(name=name)
23+
self.service_class = 'SparkplugBWriter'

0 commit comments

Comments
 (0)