Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CONTROLLERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
- [RocksDbStateStorage](#RocksDbStateStorage)
- [SmbConnectionControllerService](#SmbConnectionControllerService)
- [SparkplugBReader](#SparkplugBReader)
- [SparkplugBWriter](#SparkplugBWriter)
- [SSLContextService](#SSLContextService)
- [UpdatePolicyControllerService](#UpdatePolicyControllerService)
- [VolatileMapStateStorage](#VolatileMapStateStorage)
Expand Down Expand Up @@ -295,6 +296,20 @@ In the list below, the names of required properties appear in bold. Any other pr
|------|---------------|------------------|-------------|


## SparkplugBWriter

### Description

Serializes recordset to Sparkplug B messages and writes them into a FlowFile. This writer is typically used with MQTT processors like PublishMQTT.

### Properties

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

| Name | Default Value | Allowable Values | Description |
|------|---------------|------------------|-------------|


## SSLContextService

### Description
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte
| Kubernetes (Linux) | [KubernetesControllerService](CONTROLLERS.md#kubernetescontrollerservice) | -DENABLE_KUBERNETES=ON |
| LlamaCpp | [RunLlamaCppInference](PROCESSORS.md#runllamacppinference) | -DENABLE_LLAMACPP=ON |
| Lua Scripting | [ExecuteScript](PROCESSORS.md#executescript) | -DENABLE_LUA_SCRIPTING=ON |
| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt)<br/>[SparkplugBReader](PROCESSORS.md#sparkplugbreader) | -DENABLE_MQTT=ON |
| MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt)<br/>[SparkplugBReader](PROCESSORS.md#sparkplugbreader)<br/>[SparkplugBWriter](PROCESSORS.md#sparkplugbwriter) | -DENABLE_MQTT=ON |
| OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)<br/>[PutOPCProcessor](PROCESSORS.md#putopcprocessor) | -DENABLE_OPC=ON |
| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)<br/>[MotionDetector](PROCESSORS.md#motiondetector) | -DENABLE_OPENCV=ON |
| PDH (Windows) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) | -DENABLE_PDH=ON |
Expand Down
36 changes: 36 additions & 0 deletions docker/test/integration/features/mqtt.feature
Original file line number Diff line number Diff line change
Expand Up @@ -579,3 +579,39 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT
And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds

Scenario: A MiNiFi instance publishes then consumes Sparkplug message through MQTT broker
Given an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT

And a SparkplugBReader controller service is set up in the "consumer-client" flow
And a JsonRecordSetWriter controller service is set up with "Array" output grouping in the "consumer-client" flow
And a SparkplugBWriter controller service is set up in the "publisher-client" flow
And a JsonTreeReader controller service is set up in the "publisher-client" flow

And a file with the content '{"timestamp":987654321,"metrics":[{"int_value":123,"timestamp":45345346346,"name":"TestMetric"}],"seq":12345,"body":"test-body","uuid":"test-uuid"}' is present in '/tmp/input'

And a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow
And a PublishMQTT processor in the "publisher-client" flow
And the "Topic" property of the PublishMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
And the "MQTT Version" property of the PublishMQTT processor is set to "3.1.1"
And the "Record Reader" property of the PublishMQTT processor is set to "JsonTreeReader"
And the "Record Writer" property of the PublishMQTT processor is set to "SparkplugBWriter"

And the "success" relationship of the GetFile processor is connected to the PublishMQTT

And a ConsumeMQTT processor in the "consumer-client" flow
And the "Topic" property of the ConsumeMQTT processor is set to "spBv1.0/TestGroup/DDATA/TestNode/TestDevice"
And the "MQTT Version" property of the ConsumeMQTT processor is set to "3.1.1"
And the "Record Reader" property of the ConsumeMQTT processor is set to "SparkplugBReader"
And the "Record Writer" property of the ConsumeMQTT processor is set to "JsonRecordSetWriter"
And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow
And a LogAttribute processor with the "Log Payload" property set to "true" in the "consumer-client" flow
And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute

When all instances start up

Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client"
And a flowfile with the JSON content '[{"_isDuplicate": false, "_qos": 0, "_topicSegments": ["spBv1.0", "TestGroup", "DDATA", "TestNode", "TestDevice"], "_topic": "spBv1.0/TestGroup/DDATA/TestNode/TestDevice", "_isRetained": false, "body": "test-body", "uuid": "test-uuid", "seq": 12345, "metrics": [{"int_value": 123, "timestamp": 45345346346, "name": "TestMetric"}], "timestamp": 987654321}]' is placed in the monitored directory in less than 60 seconds
And the Minifi logs contain the following message: "key:record.count value:1" in less than 60 seconds
And the Minifi logs contain the following message: "key:mqtt.broker value:mqtt-broker-" in less than 1 seconds
24 changes: 21 additions & 3 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
from minifi.controllers.XMLReader import XMLReader
from minifi.controllers.SparkplugBReader import SparkplugBReader
from minifi.controllers.SparkplugBWriter import SparkplugBWriter

from behave import given, then, when
from behave.model_describe import ModelDescriptor
Expand Down Expand Up @@ -507,13 +508,30 @@ def step_impl(context):
context.test.start('mqtt-broker')


@given("a SparkplugBReader controller service is set up")
def step_impl(context):
@given("a SparkplugBReader controller service is set up in the \"{minifi_container_name}\" flow")
def step_impl(context, minifi_container_name: str):
sparkplug_record_set_reader = SparkplugBReader("SparkplugBReader")
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container = context.test.acquire_container(context=context, name=minifi_container_name)
container.add_controller(sparkplug_record_set_reader)


@given("a SparkplugBWriter controller service is set up in the \"{minifi_container_name}\" flow")
def step_impl(context, minifi_container_name: str):
sparkplug_record_set_writer = SparkplugBWriter("SparkplugBWriter")
container = context.test.acquire_container(context=context, name=minifi_container_name)
container.add_controller(sparkplug_record_set_writer)


@given("a SparkplugBReader controller service is set up")
def step_impl(context):
context.execute_steps("given a SparkplugBReader controller service is set up in the \"minifi-cpp-flow\" flow")


@given("a SparkplugBWriter controller service is set up")
def step_impl(context):
context.execute_steps("given a SparkplugBWriter controller service is set up in the \"minifi-cpp-flow\" flow")


@when("a test Sparkplug payload is published to the topic \"{topic}\"")
def step_impl(context, topic):
context.test.publish_test_sparkplug_payload(topic)
Expand Down
23 changes: 23 additions & 0 deletions docker/test/integration/minifi/controllers/SparkplugBWriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from ..core.ControllerService import ControllerService


class SparkplugBWriter(ControllerService):
def __init__(self, name=None):
super(SparkplugBWriter, self).__init__(name=name)
self.service_class = 'SparkplugBWriter'
Loading
Loading