Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ limitations under the License.
- [ConsumeKafka](#ConsumeKafka)
- [ConsumeMQTT](#ConsumeMQTT)
- [ConsumeWindowsEventLog](#ConsumeWindowsEventLog)
- [ConvertRecord](#ConvertRecord)
- [DefragmentText](#DefragmentText)
- [DeleteAzureBlobStorage](#DeleteAzureBlobStorage)
- [DeleteAzureDataLakeStorage](#DeleteAzureDataLakeStorage)
Expand Down Expand Up @@ -474,6 +475,37 @@ In the list below, the names of required properties appear in bold. Any other pr
| success | Relationship for successfully consumed events. |


## ConvertRecord

### Description

Converts records from one data format to another using configured Record Reader and Record Writer Controller Services.

### Properties

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

| Name | Default Value | Allowable Values | Description |
|-----------------------------------|---------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Record Reader** | | | Specifies the Controller Service to use for reading incoming data |
| **Record Writer** | | | Specifies the Controller Service to use for writing out the records |
| **Include Zero Record FlowFiles** | true | true<br/>false | When converting an incoming FlowFile, if the conversion results in no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship. |

### Relationships

| Name | Description |
|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| failure | If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship |
| success | FlowFiles that are successfully transformed will be routed to this relationship |

### Output Attributes

| Attribute | Relationship | Description |
|----------------------|--------------|-------------------------------------------------------------------------------------------|
| record.count | success | The number of records in the FlowFile |
| record.error.message | failure | This attribute provides on failure the error message encountered by the Reader or Writer. |


## DefragmentText

### Description
Expand Down
15 changes: 15 additions & 0 deletions docker/test/integration/features/core_functionality.feature
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,18 @@ Feature: Core flow functionalities
| config_format |
| yaml |
| json |

@CORE
Scenario: ConvertRecord processor can convert records from one format to another
Given a XMLReader controller service is set up
And a JsonRecordSetWriter controller service is set up with "Array" output grouping
And a GenerateFlowFile processor with the "Data Format" property set to "Text"
And the "Custom Text" property of the GenerateFlowFile processor is set to "<record><numbers>1</numbers><numbers>2</numbers></record>"
And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false"
And a ConvertRecord processor with the "Record Reader" property set to "XMLReader"
And the "Record Writer" property of the ConvertRecord processor is set to "JsonRecordSetWriter"
And the "success" relationship of the GenerateFlowFile processor is connected to the ConvertRecord
And a PutFile processor with the "Directory" property set to "/tmp/output"
And the "success" relationship of the ConvertRecord processor is connected to the PutFile
When the MiNiFi instance starts up
Then a flowfile with the JSON content '[{"numbers":[1,2]}]' is placed in the monitored directory in less than 60 seconds
8 changes: 8 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter
from minifi.controllers.JsonTreeReader import JsonTreeReader
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
from minifi.controllers.XMLReader import XMLReader

from behave import given, then, when
from behave.model_describe import ModelDescriptor
Expand Down Expand Up @@ -405,6 +406,13 @@ def step_impl(context):
container.add_controller(json_record_set_reader)


@given("a XMLReader controller service is set up")
def step_impl(context):
xml_reader = XMLReader("XMLReader")
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(xml_reader)


# Kubernetes
def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties):
kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties)
Expand Down
21 changes: 21 additions & 0 deletions docker/test/integration/minifi/controllers/XMLReader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..core.ControllerService import ControllerService


class XMLReader(ControllerService):
def __init__(self, name=None):
super(XMLReader, self).__init__(name=name)
self.service_class = 'XMLReader'
23 changes: 23 additions & 0 deletions docker/test/integration/minifi/processors/ConvertRecord.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..core.Processor import Processor


class ConvertRecord(Processor):
def __init__(self, context):
super(ConvertRecord, self).__init__(
context=context,
clazz='ConvertRecord',
auto_terminate=['success', 'failure'])
65 changes: 65 additions & 0 deletions extensions/standard-processors/processors/ConvertRecord.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConvertRecord.h"

#include "core/Resource.h"
#include "nonstd/expected.hpp"
#include "utils/GeneralUtils.h"
#include "utils/ProcessorConfigUtils.h"

namespace org::apache::nifi::minifi::processors {

void ConvertRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
record_set_reader_ = utils::parseControllerService<core::RecordSetReader>(context, RecordReader, getUUID());
record_set_writer_ = utils::parseControllerService<core::RecordSetWriter>(context, RecordWriter, getUUID());
include_zero_record_flow_files_ = utils::parseBoolProperty(context, IncludeZeroRecordFlowFiles);
}

void ConvertRecord::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
gsl_Expects(record_set_reader_ && record_set_writer_);
const auto flow_file = session.get();
if (!flow_file) {
context.yield();
return;
}

nonstd::expected<core::RecordSet, std::error_code> record_set;
session.read(flow_file, [this, &record_set](const std::shared_ptr<io::InputStream>& input_stream) {
record_set = record_set_reader_->read(*input_stream);
return gsl::narrow<int64_t>(input_stream->size());
});
if (!record_set) {
logger_->log_error("Failed to read record set from flow file: {}", record_set.error().message());
flow_file->setAttribute(processors::ConvertRecord::RecordErrorMessageOutputAttribute.name, record_set.error().message());
session.transfer(flow_file, Failure);
return;
}

if (!include_zero_record_flow_files_ && record_set->empty()) {
logger_->log_info("No records found in flow file, removing flow file");
session.remove(flow_file);
return;
}

record_set_writer_->write(*record_set, flow_file, session);
flow_file->setAttribute(processors::ConvertRecord::RecordCountOutputAttribute.name, std::to_string(record_set->size()));
session.transfer(flow_file, Success);
}

REGISTER_RESOURCE(ConvertRecord, Processor);

} // namespace org::apache::nifi::minifi::processors
85 changes: 85 additions & 0 deletions extensions/standard-processors/processors/ConvertRecord.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <memory>
#include <string_view>
#include <utility>

#include "core/AbstractProcessor.h"
#include "core/ProcessSession.h"
#include "core/PropertyDefinitionBuilder.h"
#include "core/RelationshipDefinition.h"
#include "controllers/RecordSetReader.h"
#include "controllers/RecordSetWriter.h"

namespace org::apache::nifi::minifi::processors {

class ConvertRecord : public core::AbstractProcessor<ConvertRecord> {
public:
using core::AbstractProcessor<ConvertRecord>::AbstractProcessor;

EXTENSIONAPI static constexpr const char* Description = "Converts records from one data format to another using configured Record Reader and Record Writer Controller Services.";

EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
.withDescription("Specifies the Controller Service to use for reading incoming data")
.isRequired(true)
.withAllowedTypes<minifi::core::RecordSetReader>()
.build();
EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
.withDescription("Specifies the Controller Service to use for writing out the records")
.isRequired(true)
.withAllowedTypes<minifi::core::RecordSetWriter>()
.build();
EXTENSIONAPI static constexpr auto IncludeZeroRecordFlowFiles = core::PropertyDefinitionBuilder<>::createProperty("Include Zero Record FlowFiles")
.withDescription("When converting an incoming FlowFile, if the conversion results in no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship.")
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("true")
.isRequired(true)
.build();

EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({
RecordReader,
RecordWriter,
IncludeZeroRecordFlowFiles
});

EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure",
"If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship"};
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are successfully transformed will be routed to this relationship"};
EXTENSIONAPI static constexpr auto Relationships = std::array{Failure, Success};

EXTENSIONAPI static constexpr auto RecordCountOutputAttribute = core::OutputAttributeDefinition<1>{"record.count", {Success}, "The number of records in the FlowFile"};
EXTENSIONAPI static constexpr auto RecordErrorMessageOutputAttribute = core::OutputAttributeDefinition<1>{"record.error.message", {Failure},
"This attribute provides on failure the error message encountered by the Reader or Writer."};
EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{RecordCountOutputAttribute, RecordErrorMessageOutputAttribute};

EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;

void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;

private:
std::shared_ptr<core::RecordSetReader> record_set_reader_;
std::shared_ptr<core::RecordSetWriter> record_set_writer_;
bool include_zero_record_flow_files_ = true;
};

} // namespace org::apache::nifi::minifi::processors
Loading
Loading