diff --git a/PROCESSORS.md b/PROCESSORS.md index bec6d178f5..e3d8c66e53 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -27,6 +27,7 @@ limitations under the License. - [ConsumeKafka](#ConsumeKafka) - [ConsumeMQTT](#ConsumeMQTT) - [ConsumeWindowsEventLog](#ConsumeWindowsEventLog) +- [ConvertRecord](#ConvertRecord) - [DefragmentText](#DefragmentText) - [DeleteAzureBlobStorage](#DeleteAzureBlobStorage) - [DeleteAzureDataLakeStorage](#DeleteAzureDataLakeStorage) @@ -474,6 +475,37 @@ In the list below, the names of required properties appear in bold. Any other pr | success | Relationship for successfully consumed events. | +## ConvertRecord + +### Description + +Converts records from one data format to another using configured Record Reader and Record Writer Controller Services. + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|-----------------------------------|---------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Record Reader** | | | Specifies the Controller Service to use for reading incoming data | +| **Record Writer** | | | Specifies the Controller Service to use for writing out the records | +| **Include Zero Record FlowFiles** | true | true
false | When converting an incoming FlowFile, if the conversion results in no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship. | + +### Relationships + +| Name | Description | +|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| failure | If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship | +| success | FlowFiles that are successfully transformed will be routed to this relationship | + +### Output Attributes + +| Attribute | Relationship | Description | +|----------------------|--------------|-------------------------------------------------------------------------------------------| +| record.count | success | The number of records in the FlowFile | +| record.error.message | failure | This attribute provides on failure the error message encountered by the Reader or Writer. | + + ## DefragmentText ### Description diff --git a/docker/test/integration/features/core_functionality.feature b/docker/test/integration/features/core_functionality.feature index ee5290ec88..564cf45413 100644 --- a/docker/test/integration/features/core_functionality.feature +++ b/docker/test/integration/features/core_functionality.feature @@ -123,3 +123,18 @@ Feature: Core flow functionalities | config_format | | yaml | | json | + + @CORE + Scenario: ConvertRecord processor can convert records from one format to another + Given a XMLReader controller service is set up + And a JsonRecordSetWriter controller service is set up with "Array" output grouping + And a GenerateFlowFile processor with the "Data Format" property set to "Text" + And the "Custom Text" property of the GenerateFlowFile processor is set to "12" + And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false" + And a ConvertRecord processor with the "Record Reader" property set to "XMLReader" + And the "Record Writer" property of the ConvertRecord processor is set to "JsonRecordSetWriter" + And the "success" relationship of the GenerateFlowFile processor is connected to the ConvertRecord + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the ConvertRecord processor is connected to the PutFile + When the MiNiFi instance starts up + Then a flowfile with the JSON content '[{"numbers":[1,2]}]' is placed in the monitored directory in less than 60 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index b624ecfcf5..7111f317cc 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -26,6 +26,7 @@ from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter from minifi.controllers.JsonTreeReader import JsonTreeReader from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService +from minifi.controllers.XMLReader import XMLReader from behave import given, then, when from behave.model_describe import ModelDescriptor @@ -405,6 +406,13 @@ def step_impl(context): container.add_controller(json_record_set_reader) +@given("a XMLReader controller service is set up") +def step_impl(context): + xml_reader = XMLReader("XMLReader") + container = context.test.acquire_container(context=context, name="minifi-cpp-flow") + container.add_controller(xml_reader) + + # Kubernetes def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties): kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties) diff --git a/docker/test/integration/minifi/controllers/XMLReader.py b/docker/test/integration/minifi/controllers/XMLReader.py new file mode 100644 index 0000000000..9873491029 --- /dev/null +++ b/docker/test/integration/minifi/controllers/XMLReader.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.ControllerService import ControllerService + + +class XMLReader(ControllerService): + def __init__(self, name=None): + super(XMLReader, self).__init__(name=name) + self.service_class = 'XMLReader' diff --git a/docker/test/integration/minifi/processors/ConvertRecord.py b/docker/test/integration/minifi/processors/ConvertRecord.py new file mode 100644 index 0000000000..164648eb97 --- /dev/null +++ b/docker/test/integration/minifi/processors/ConvertRecord.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from ..core.Processor import Processor + + +class ConvertRecord(Processor): + def __init__(self, context): + super(ConvertRecord, self).__init__( + context=context, + clazz='ConvertRecord', + auto_terminate=['success', 'failure']) diff --git a/extensions/standard-processors/processors/ConvertRecord.cpp b/extensions/standard-processors/processors/ConvertRecord.cpp new file mode 100644 index 0000000000..fb85990c1d --- /dev/null +++ b/extensions/standard-processors/processors/ConvertRecord.cpp @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ConvertRecord.h" + +#include "core/Resource.h" +#include "nonstd/expected.hpp" +#include "utils/GeneralUtils.h" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::processors { + +void ConvertRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + record_set_reader_ = utils::parseControllerService(context, RecordReader, getUUID()); + record_set_writer_ = utils::parseControllerService(context, RecordWriter, getUUID()); + include_zero_record_flow_files_ = utils::parseBoolProperty(context, IncludeZeroRecordFlowFiles); +} + +void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + gsl_Expects(record_set_reader_ && record_set_writer_); + const auto flow_file = session.get(); + if (!flow_file) { + context.yield(); + return; + } + + nonstd::expected record_set; + session.read(flow_file, [this, &record_set](const std::shared_ptr& input_stream) { + record_set = record_set_reader_->read(*input_stream); + return gsl::narrow(input_stream->size()); + }); + if (!record_set) { + logger_->log_error("Failed to read record set from flow file: {}", record_set.error().message()); + flow_file->setAttribute(processors::ConvertRecord::RecordErrorMessageOutputAttribute.name, record_set.error().message()); + session.transfer(flow_file, Failure); + return; + } + + if (!include_zero_record_flow_files_ && record_set->empty()) { + logger_->log_info("No records found in flow file, removing flow file"); + session.remove(flow_file); + return; + } + + record_set_writer_->write(*record_set, flow_file, session); + flow_file->setAttribute(processors::ConvertRecord::RecordCountOutputAttribute.name, std::to_string(record_set->size())); + session.transfer(flow_file, Success); +} + +REGISTER_RESOURCE(ConvertRecord, Processor); + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/ConvertRecord.h b/extensions/standard-processors/processors/ConvertRecord.h new file mode 100644 index 0000000000..7247c3ed37 --- /dev/null +++ b/extensions/standard-processors/processors/ConvertRecord.h @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include + +#include "core/AbstractProcessor.h" +#include "core/ProcessSession.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/RelationshipDefinition.h" +#include "controllers/RecordSetReader.h" +#include "controllers/RecordSetWriter.h" + +namespace org::apache::nifi::minifi::processors { + +class ConvertRecord : public core::AbstractProcessor { + public: + using core::AbstractProcessor::AbstractProcessor; + + EXTENSIONAPI static constexpr const char* Description = "Converts records from one data format to another using configured Record Reader and Record Writer Controller Services."; + + EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader") + .withDescription("Specifies the Controller Service to use for reading incoming data") + .isRequired(true) + .withAllowedTypes() + .build(); + EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer") + .withDescription("Specifies the Controller Service to use for writing out the records") + .isRequired(true) + .withAllowedTypes() + .build(); + EXTENSIONAPI static constexpr auto IncludeZeroRecordFlowFiles = core::PropertyDefinitionBuilder<>::createProperty("Include Zero Record FlowFiles") + .withDescription("When converting an incoming FlowFile, if the conversion results in no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship.") + .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) + .withDefaultValue("true") + .isRequired(true) + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::to_array({ + RecordReader, + RecordWriter, + IncludeZeroRecordFlowFiles + }); + + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", + "If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship"}; + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are successfully transformed will be routed to this relationship"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Failure, Success}; + + EXTENSIONAPI static constexpr auto RecordCountOutputAttribute = core::OutputAttributeDefinition<1>{"record.count", {Success}, "The number of records in the FlowFile"}; + EXTENSIONAPI static constexpr auto RecordErrorMessageOutputAttribute = core::OutputAttributeDefinition<1>{"record.error.message", {Failure}, + "This attribute provides on failure the error message encountered by the Reader or Writer."}; + EXTENSIONAPI static constexpr auto OutputAttributes = std::array{RecordCountOutputAttribute, RecordErrorMessageOutputAttribute}; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + + private: + std::shared_ptr record_set_reader_; + std::shared_ptr record_set_writer_; + bool include_zero_record_flow_files_ = true; +}; + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/ConvertRecordTests.cpp b/extensions/standard-processors/tests/unit/ConvertRecordTests.cpp new file mode 100644 index 0000000000..078986a0e4 --- /dev/null +++ b/extensions/standard-processors/tests/unit/ConvertRecordTests.cpp @@ -0,0 +1,103 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "unit/Catch.h" +#include "unit/TestBase.h" +#include "../controllers/XMLReader.h" +#include "../controllers/JsonRecordSetWriter.h" +#include "unit/SingleProcessorTestController.h" +#include "../processors/ConvertRecord.h" +#include "utils/StringUtils.h" +#include "unit/ProcessorUtils.h" + +namespace org::apache::nifi::minifi::test { + +TEST_CASE("ConvertRecord scheduling fails with invalid reader and writer", "[ConvertRecord]") { + LogTestController::getInstance().setTrace(); + SingleProcessorTestController controller(utils::make_processor("ConvertRecord")); + controller.plan->addController("XMLReader", "XMLReader"); + + REQUIRE_THROWS_WITH(controller.trigger(minifi::test::InputFlowFileData{""}), "Process Schedule Operation: Required controller service property 'Record Reader' is missing"); + + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordReader.name, "XMLReader")); + REQUIRE_THROWS_WITH(controller.trigger(minifi::test::InputFlowFileData{""}), "Process Schedule Operation: Required controller service property 'Record Writer' is missing"); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordWriter.name, "XMLWriter")); + REQUIRE_THROWS_WITH(controller.trigger(minifi::test::InputFlowFileData{""}), "Process Schedule Operation: Controller service 'Record Writer' = 'XMLWriter' not found"); +} + +TEST_CASE("Record conversion fails with read failure", "[ConvertRecord]") { + LogTestController::getInstance().setTrace(); + SingleProcessorTestController controller(utils::make_processor("ConvertRecord")); + controller.plan->addController("XMLReader", "XMLReader"); + controller.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter"); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordReader.name, "XMLReader")); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordWriter.name, "JsonRecordSetWriter")); + + auto results = controller.trigger(minifi::test::InputFlowFileData{""}); + REQUIRE(results.at(processors::ConvertRecord::Failure).size() == 1); + auto& output_flow_file = results.at(processors::ConvertRecord::Failure)[0]; + CHECK(controller.plan->getContent(output_flow_file) == ""); + auto error_message_attribute = minifi::utils::string::toLower(*output_flow_file->getAttribute(processors::ConvertRecord::RecordErrorMessageOutputAttribute.name)); + CHECK(error_message_attribute == "invalid argument"); + CHECK(LogTestController::getInstance().contains("Failed to read record set from flow file")); +} + +TEST_CASE("Record conversion succeeds with a single record", "[ConvertRecord]") { + SingleProcessorTestController controller(utils::make_processor("ConvertRecord")); + controller.plan->addController("XMLReader", "XMLReader"); + controller.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter"); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordReader.name, "XMLReader")); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordWriter.name, "JsonRecordSetWriter")); + + auto results = controller.trigger(minifi::test::InputFlowFileData{"value"}); + REQUIRE(results.at(processors::ConvertRecord::Success).size() == 1); + auto& output_flow_file = results.at(processors::ConvertRecord::Success)[0]; + CHECK(*output_flow_file->getAttribute(processors::ConvertRecord::RecordCountOutputAttribute.name) == "1"); + CHECK(controller.plan->getContent(output_flow_file) == R"([{"field":"value"}])"); +} + +TEST_CASE("Empty flow files are not transferred when Include Zero Record Flow Files is false", "[ConvertRecord]") { + SingleProcessorTestController controller(utils::make_processor("ConvertRecord")); + controller.plan->addController("XMLReader", "XMLReader"); + controller.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter"); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordReader.name, "XMLReader")); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordWriter.name, "JsonRecordSetWriter")); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::IncludeZeroRecordFlowFiles.name, "false")); + + auto results = controller.trigger(minifi::test::InputFlowFileData{""}); + REQUIRE(results.at(processors::ConvertRecord::Success).empty()); + REQUIRE(results.at(processors::ConvertRecord::Failure).empty()); +} + +TEST_CASE("Empty flow files are transferred when Include Zero Record Flow Files is true", "[ConvertRecord]") { + SingleProcessorTestController controller(utils::make_processor("ConvertRecord")); + controller.plan->addController("XMLReader", "XMLReader"); + controller.plan->addController("JsonRecordSetWriter", "JsonRecordSetWriter"); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordReader.name, "XMLReader")); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::RecordWriter.name, "JsonRecordSetWriter")); + REQUIRE(controller.getProcessor()->setProperty(processors::ConvertRecord::IncludeZeroRecordFlowFiles.name, "true")); + + auto results = controller.trigger(minifi::test::InputFlowFileData{""}); + REQUIRE(results.at(processors::ConvertRecord::Success).size() == 1); + REQUIRE(results.at(processors::ConvertRecord::Failure).empty()); + auto& output_flow_file = results.at(processors::ConvertRecord::Success)[0]; + CHECK(*output_flow_file->getAttribute(processors::ConvertRecord::RecordCountOutputAttribute.name) == "0"); + CHECK(controller.plan->getContent(output_flow_file) == "[]"); +} + +} // namespace org::apache::nifi::minifi::test