Skip to content

Commit b8f3bd5

Browse files
committed
MINIFICPP-2590 Add Sparkplug B support for PublishMQTT processor
1 parent 8d75557 commit b8f3bd5

File tree

10 files changed

+800
-4
lines changed

10 files changed

+800
-4
lines changed

CONTROLLERS.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ limitations under the License.
3030
- [RocksDbStateStorage](#RocksDbStateStorage)
3131
- [SmbConnectionControllerService](#SmbConnectionControllerService)
3232
- [SparkplugBReader](#SparkplugBReader)
33+
- [SparkplugBWriter](#SparkplugBWriter)
3334
- [SSLContextService](#SSLContextService)
3435
- [UpdatePolicyControllerService](#UpdatePolicyControllerService)
3536
- [VolatileMapStateStorage](#VolatileMapStateStorage)
@@ -295,6 +296,20 @@ In the list below, the names of required properties appear in bold. Any other pr
295296
|------|---------------|------------------|-------------|
296297

297298

299+
## SparkplugBWriter
300+
301+
### Description
302+
303+
Serializes recordset to Sparkplug B messages and writes them into a FlowFile. This writer is typically used with MQTT processors like PublishMQTT.
304+
305+
### Properties
306+
307+
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
308+
309+
| Name | Default Value | Allowable Values | Description |
310+
|------|---------------|------------------|-------------|
311+
312+
298313
## SSLContextService
299314

300315
### Description

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte
8585
| Kubernetes (Linux) | [KubernetesControllerService](CONTROLLERS.md#kubernetescontrollerservice) | -DENABLE_KUBERNETES=ON |
8686
| LlamaCpp | [RunLlamaCppInference](PROCESSORS.md#runllamacppinference) | -DENABLE_LLAMACPP=ON |
8787
| Lua Scripting | [ExecuteScript](PROCESSORS.md#executescript) | -DENABLE_LUA_SCRIPTING=ON |
88-
| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt)<br/>[SparkplugBReader](PROCESSORS.md#sparkplugbreader) | -DENABLE_MQTT=ON |
88+
| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt)<br/>[SparkplugBReader](PROCESSORS.md#sparkplugbreader)<br/>[SparkplugBWriter](PROCESSORS.md#sparkplugbwriter) | -DENABLE_MQTT=ON |
8989
| OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)<br/>[PutOPCProcessor](PROCESSORS.md#putopcprocessor) | -DENABLE_OPC=ON |
9090
| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)<br/>[MotionDetector](PROCESSORS.md#motiondetector) | -DENABLE_OPENCV=ON |
9191
| PDH (Windows) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) | -DENABLE_PDH=ON |

docker/test/integration/features/mqtt.feature

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -579,3 +579,39 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
579579
And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds
580580
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
581581
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds
582+
583+
Scenario: A MiNiFi instance publishes then consumes Sparkplug message through MQTT broker
584+
Given an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT
585+
586+
And a SparkplugBReader controller service is set up in the "consumer-client" flow
587+
And a JsonRecordSetWriter controller service is set up with "Array" output grouping in the "consumer-client" flow
588+
And a SparkplugBWriter controller service is set up in the "publisher-client" flow
589+
And a JsonTreeReader controller service is set up in the "publisher-client" flow
590+
591+
And a file with the content '{"timestamp":987654321,"metrics":[{"int_value":123,"timestamp":45345346346,"name":"TestMetric"}],"seq":12345,"body":"test-body","uuid":"test-uuid"}' is present in '/tmp/input'
592+
593+
And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
594+
And a PublishMQTT processor in the "publisher-client" flow
595+
And the "Topic" property of the PublishMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
596+
And the "MQTT Version" property of the PublishMQTT processor is set to "3.1.1"
597+
And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader"
598+
And the "Record Writer" property of the PublishMQTT processor is set to "SparkplugBWriter"
599+
600+
And the "success" relationship of the GetFile processor is connected to the PublishMQTT
601+
602+
And a ConsumeMQTT processor in the "consumer-client" flow
603+
And the "Topic" property of the ConsumeMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
604+
And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.1.1"
605+
And the "Record Reader" property of the ConsumeMQTT processor is set to "SparkplugBReader"
606+
And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter"
607+
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
608+
And a LogAttribute processor with the "Log Payload" property set to "true" in the "consumer-client" flow
609+
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
610+
And the "success" relationship of the PutFile processor is connected to the LogAttribute
611+
612+
When all instances start up
613+
614+
Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
615+
And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds
616+
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
617+
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds

docker/test/integration/features/steps/steps.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
3131
from minifi.controllers.XMLReader import XMLReader
3232
from minifi.controllers.SparkplugBReader import SparkplugBReader
33+
from minifi.controllers.SparkplugBWriter import SparkplugBWriter
3334

3435
from behave import given, then, when
3536
from behave.model_describe import ModelDescriptor
@@ -507,13 +508,30 @@ def step_impl(context):
507508
context.test.start('mqtt-broker')
508509

509510

510-
@given("a SparkplugBReader controller service is set up")
511-
def step_impl(context):
511+
@given("a SparkplugBReader controller service is set up in the \"{minifi_container_name}\" flow")
512+
def step_impl(context, minifi_container_name: str):
512513
sparkplug_record_set_reader = SparkplugBReader("SparkplugBReader")
513-
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
514+
container = context.test.acquire_container(context=context, name=minifi_container_name)
514515
container.add_controller(sparkplug_record_set_reader)
515516

516517

518+
@given("a SparkplugBWriter controller service is set up in the \"{minifi_container_name}\" flow")
519+
def step_impl(context, minifi_container_name: str):
520+
sparkplug_record_set_writer = SparkplugBWriter("SparkplugBWriter")
521+
container = context.test.acquire_container(context=context, name=minifi_container_name)
522+
container.add_controller(sparkplug_record_set_writer)
523+
524+
525+
@given("a SparkplugBReader controller service is set up")
526+
def step_impl(context):
527+
context.execute_steps("given a SparkplugBReader controller service is set up in the \"minifi-cpp-flow\" flow")
528+
529+
530+
@given("a SparkplugBWriter controller service is set up")
531+
def step_impl(context):
532+
context.execute_steps("given a SparkplugBWriter controller service is set up in the \"minifi-cpp-flow\" flow")
533+
534+
517535
@when("a test Sparkplug payload is published to the topic \"{topic}\"")
518536
def step_impl(context, topic):
519537
context.test.publish_test_sparkplug_payload(topic)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
from ..core.ControllerService import ControllerService
18+
19+
20+
class SparkplugBWriter(ControllerService):
21+
def __init__(self, name=None):
22+
super(SparkplugBWriter, self).__init__(name=name)
23+
self.service_class = 'SparkplugBWriter'

0 commit comments

Comments
 (0)