Skip to content

Commit a537afe

Browse files
committed
MINIFICPP-2595 Create ConvertRecord processor
1 parent e6e8d8a commit a537afe

File tree

8 files changed

+366
-0
lines changed

8 files changed

+366
-0
lines changed

PROCESSORS.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ limitations under the License.
2727
- [ConsumeKafka](#ConsumeKafka)
2828
- [ConsumeMQTT](#ConsumeMQTT)
2929
- [ConsumeWindowsEventLog](#ConsumeWindowsEventLog)
30+
- [ConvertRecord](#ConvertRecord)
3031
- [DefragmentText](#DefragmentText)
3132
- [DeleteAzureBlobStorage](#DeleteAzureBlobStorage)
3233
- [DeleteAzureDataLakeStorage](#DeleteAzureDataLakeStorage)
@@ -482,6 +483,37 @@ In the list below, the names of required properties appear in bold. Any other pr
482483
| success | Relationship for successfully consumed events. |
483484

484485

486+
## ConvertRecord
487+
488+
### Description
489+
490+
Converts records from one data format to another using configured Record Reader and Record Write Controller Services.
491+
492+
### Properties
493+
494+
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
495+
496+
| Name | Default Value | Allowable Values | Description |
497+
|-----------------------------------|---------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
498+
| **Record Reader** | | | Specifies the Controller Service to use for reading incoming data |
499+
| **Record Writer** | | | Specifies the Controller Service to use for writing out the records |
500+
| **Include Zero Record FlowFiles** | true | true<br/>false | When converting an incoming FlowFile, if the conversion results in no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship. |
501+
502+
### Relationships
503+
504+
| Name | Description |
505+
|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
506+
| failure | If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship |
507+
| success | FlowFiles that are successfully transformed will be routed to this relationship |
508+
509+
### Output Attributes
510+
511+
| Attribute | Relationship | Description |
512+
|----------------------|--------------|-------------------------------------------------------------------------------------------|
513+
| record.count | success | The number of records in the FlowFile |
514+
| record.error.message | failure | This attribute provides on failure the error message encountered by the Reader or Writer. |
515+
516+
485517
## DefragmentText
486518

487519
### Description

docker/test/integration/features/core_functionality.feature

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,18 @@ Feature: Core flow functionalities
123123
| config_format |
124124
| yaml |
125125
| json |
126+
127+
@CORE
128+
Scenario: ConvertRecord processor can convert records from one format to another
129+
Given a XMLReader controller service is set up
130+
And a JsonRecordSetWriter controller service is set up with "Array" output grouping
131+
And a GenerateFlowFile processor with the "Data Format" property set to "Text"
132+
And the "Custom Text" property of the GenerateFlowFile processor is set to "<record><numbers>1</numbers><numbers>2</numbers></record>"
133+
And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false"
134+
And a ConvertRecord processor with the "Record Reader" property set to "XMLReader"
135+
And the "Record Writer" property of the ConvertRecord processor is set to "JsonRecordSetWriter"
136+
And the "success" relationship of the GenerateFlowFile processor is connected to the ConvertRecord
137+
And a PutFile processor with the "Directory" property set to "/tmp/output"
138+
And the "success" relationship of the ConvertRecord processor is connected to the PutFile
139+
When the MiNiFi instance starts up
140+
Then a flowfile with the JSON content '[{"numbers":[1,2]}]' is placed in the monitored directory in less than 60 seconds

docker/test/integration/features/steps/steps.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from minifi.controllers.JsonTreeReader import JsonTreeReader
2828
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
2929
from minifi.controllers.SparkplugBReader import SparkplugBReader
30+
from minifi.controllers.XMLReader import XMLReader
3031

3132
from behave import given, then, when
3233
from behave.model_describe import ModelDescriptor
@@ -406,6 +407,13 @@ def step_impl(context):
406407
container.add_controller(json_record_set_reader)
407408

408409

410+
@given("a XMLReader controller service is set up")
411+
def step_impl(context):
412+
xml_reader = XMLReader("XMLReader")
413+
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
414+
container.add_controller(xml_reader)
415+
416+
409417
# Kubernetes
410418
def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties):
411419
kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
from ..core.ControllerService import ControllerService
16+
17+
18+
class XMLReader(ControllerService):
19+
def __init__(self, name=None):
20+
super(XMLReader, self).__init__(name=name)
21+
self.service_class = 'XMLReader'
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
from ..core.Processor import Processor
16+
17+
18+
class ConvertRecord(Processor):
19+
def __init__(self, context):
20+
super(ConvertRecord, self).__init__(
21+
context=context,
22+
clazz='ConvertRecord',
23+
auto_terminate=['success', 'failure'])
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#include "ConvertRecord.h"
18+
19+
#include "core/Resource.h"
20+
#include "nonstd/expected.hpp"
21+
#include "utils/GeneralUtils.h"
22+
#include "utils/ProcessorConfigUtils.h"
23+
24+
namespace org::apache::nifi::minifi::processors {
25+
namespace {
26+
template<typename RecordSetIO>
27+
std::shared_ptr<RecordSetIO> getRecordSetIO(core::ProcessContext& context, const core::PropertyReference& property, const utils::Identifier& processor_uuid) {
28+
std::string service_name = context.getProperty(property).value_or("");
29+
if (!IsNullOrEmpty(service_name)) {
30+
auto record_set_io = std::dynamic_pointer_cast<RecordSetIO>(context.getControllerService(service_name, processor_uuid));
31+
if (!record_set_io)
32+
return nullptr;
33+
return record_set_io;
34+
}
35+
return nullptr;
36+
}
37+
} // namespace
38+
39+
void ConvertRecord::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
40+
record_set_reader_ = getRecordSetIO<core::RecordSetReader>(context, RecordReader, getUUID());
41+
if (!record_set_reader_) {
42+
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Reader property is missing or invalid");
43+
}
44+
record_set_writer_ = getRecordSetIO<core::RecordSetWriter>(context, RecordWriter, getUUID());
45+
if (!record_set_writer_) {
46+
throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Record Writer property is missing or invalid");
47+
}
48+
include_zero_record_flow_files_ = utils::parseBoolProperty(context, IncludeZeroRecordFlowFiles);
49+
}
50+
51+
void ConvertRecord::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
52+
const auto flow_file = session.get();
53+
if (!flow_file) {
54+
yield();
55+
return;
56+
}
57+
58+
nonstd::expected<core::RecordSet, std::error_code> record_set;
59+
session.read(flow_file, [this, &record_set](const std::shared_ptr<io::InputStream>& input_stream) {
60+
record_set = record_set_reader_->read(*input_stream);
61+
return gsl::narrow<int64_t>(input_stream->size());
62+
});
63+
if (!record_set) {
64+
logger_->log_error("Failed to read record set from flow file: {}", record_set.error().message());
65+
flow_file->setAttribute(processors::ConvertRecord::RecordErrorMessageOutputAttribute.name, record_set.error().message());
66+
session.transfer(flow_file, Failure);
67+
return;
68+
}
69+
70+
if (!include_zero_record_flow_files_ && record_set->empty()) {
71+
logger_->log_info("No records found in flow file, removing flow file");
72+
session.remove(flow_file);
73+
return;
74+
}
75+
76+
record_set_writer_->write(*record_set, flow_file, session);
77+
flow_file->setAttribute(processors::ConvertRecord::RecordCountOutputAttribute.name, std::to_string(record_set->size()));
78+
session.transfer(flow_file, Success);
79+
}
80+
81+
REGISTER_RESOURCE(ConvertRecord, Processor);
82+
83+
} // namespace org::apache::nifi::minifi::processors
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include <memory>
20+
#include <string_view>
21+
#include <utility>
22+
23+
#include "core/AbstractProcessor.h"
24+
#include "core/Processor.h"
25+
#include "core/ProcessSession.h"
26+
#include "core/PropertyDefinitionBuilder.h"
27+
#include "core/RelationshipDefinition.h"
28+
#include "controllers/RecordSetReader.h"
29+
#include "controllers/RecordSetWriter.h"
30+
31+
namespace org::apache::nifi::minifi::processors {
32+
33+
class ConvertRecord : public core::AbstractProcessor<ConvertRecord> {
34+
public:
35+
using core::AbstractProcessor<ConvertRecord>::AbstractProcessor;
36+
37+
EXTENSIONAPI static constexpr const char* Description = "Converts records from one data format to another using configured Record Reader and Record Write Controller Services.";
38+
39+
EXTENSIONAPI static constexpr auto RecordReader = core::PropertyDefinitionBuilder<>::createProperty("Record Reader")
40+
.withDescription("Specifies the Controller Service to use for reading incoming data")
41+
.isRequired(true)
42+
.withAllowedTypes<minifi::core::RecordSetReader>()
43+
.build();
44+
EXTENSIONAPI static constexpr auto RecordWriter = core::PropertyDefinitionBuilder<>::createProperty("Record Writer")
45+
.withDescription("Specifies the Controller Service to use for writing out the records")
46+
.isRequired(true)
47+
.withAllowedTypes<minifi::core::RecordSetWriter>()
48+
.build();
49+
EXTENSIONAPI static constexpr auto IncludeZeroRecordFlowFiles = core::PropertyDefinitionBuilder<>::createProperty("Include Zero Record FlowFiles")
50+
.withDescription("When converting an incoming FlowFile, if the conversion results in no data, this property specifies whether or not a FlowFile will be sent to the corresponding relationship.")
51+
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
52+
.withDefaultValue("true")
53+
.isRequired(true)
54+
.build();
55+
56+
EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({
57+
RecordReader,
58+
RecordWriter,
59+
IncludeZeroRecordFlowFiles
60+
});
61+
62+
EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure",
63+
"If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship"};
64+
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "FlowFiles that are successfully transformed will be routed to this relationship"};
65+
EXTENSIONAPI static constexpr auto Relationships = std::array{Failure, Success};
66+
67+
EXTENSIONAPI static constexpr auto RecordCountOutputAttribute = core::OutputAttributeDefinition<1>{"record.count", {Success}, "The number of records in the FlowFile"};
68+
EXTENSIONAPI static constexpr auto RecordErrorMessageOutputAttribute = core::OutputAttributeDefinition<1>{"record.error.message", {Failure},
69+
"This attribute provides on failure the error message encountered by the Reader or Writer."};
70+
EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{RecordCountOutputAttribute, RecordErrorMessageOutputAttribute};
71+
72+
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
73+
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
74+
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
75+
EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
76+
77+
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
78+
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
79+
80+
private:
81+
std::shared_ptr<core::RecordSetReader> record_set_reader_;
82+
std::shared_ptr<core::RecordSetWriter> record_set_writer_;
83+
bool include_zero_record_flow_files_ = true;
84+
};
85+
86+
} // namespace org::apache::nifi::minifi::processors

0 commit comments

Comments
 (0)